docs.rs failed to build taos-sys-0.12.2
Please check the
build logs for more information.
See
Builds for ideas on how to fix a failed build,
or
Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault,
open an issue.
The Official Rust Connector for TDengine
Docs.rs |
Crates.io Version |
Crates.io Downloads |
CodeCov |
|
|
|
|
This is the official TDengine connector in Rust.
Dependencies
if you use the default features, it'll depend on:
Usage
By default, enable both native and websocket client:
[dependencies]
taos = "*"
For websocket client only:
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
For native only:
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
Query
use chrono::{DateTime, Local};
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "query";
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
let inserted = taos.exec_many([
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(16))",
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("select * from `meters`").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("got value of {}: {}", name, value);
}
}
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
ts: DateTime<Local>,
current: Option<f32>,
voltage: Option<i32>,
phase: Option<f32>,
groupid: i32,
location: String,
}
let records: Vec<Record> = taos
.query("select * from `meters`")
.await?
.deserialize()
.try_collect()
.await?;
dbg!(records);
Ok(())
}
Subscription
use std::time::Duration;
use chrono::{DateTime, Local};
use taos::*;
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
ts: DateTime<Local>,
current: Option<f32>,
voltage: Option<i32>,
phase: Option<f32>,
}
async fn prepare(taos: Taos) -> anyhow::Result<()> {
let inserted = taos.exec_many([
"CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
"INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
"INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)",
"INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::init();
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
let db = "tmq";
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT)\
TAGS (`groupid` INT, `location` BINARY(16))"),
format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
let task = tokio::spawn(prepare(taos));
tokio::time::sleep(Duration::from_secs(1)).await;
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
let mut consumer = tmq.build()?;
consumer.subscribe(["tmq_meters"]).await?;
{
let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? {
let topic = offset.topic();
let vgroup_id = offset.vgroup_id();
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
let name = block.table_name();
let records: Vec<Record> = block.deserialize().try_collect()?;
println!(
"** table: {}, got {} records: {:#?}\n",
name.unwrap(),
records.len(),
records
);
}
}
consumer.commit(offset).await?;
}
}
consumer.unsubscribe().await;
task.await??;
Ok(())
}
Contribution
Welcome for all contributions.
License
Keep same with TDengine.