iotdb-client-rs 0.2.0

Rust client for apache iotdb
Documentation
iotdb-client-rs-0.2.0 has been yanked.

Apache IoTDB

Main Mac and Linux Main Win coveralls GitHub release License IoTDB Website

Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for data management and analysis, deployable on the edge and the cloud. Due to its light-weight architecture, high performance and rich feature set together with its deep integration with Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, high-speed data ingestion and complex data analysis in the IoT industrial fields.

Apache IoTDB Client for Rust

Overview

This is the Rust client of Apache IoTDB.

Apache IoTDB website: https://iotdb.apache.org Apache IoTDB Github: https://github.com/apache/iotdb

Prerequisites

How to Use the Client (Quick Start)

Usage

Put this in your Cargo.toml:

[dependencies]
iotdb-client-rs="0.1.0"
chrono="0.4.19"
prettytable-rs="0.8.0"
use std::error::Error;
use std::vec;

use chrono;

use chrono::Local;
use iotdb_client_rs::client::remote::{Config, RpcSession};
use iotdb_client_rs::client::{DataSet, MeasurementSchema, Session, Tablet, Value};
use iotdb_client_rs::protocal::{TSCompressionType, TSDataType, TSEncoding};
use prettytable::{cell, Cell, Row, Table};

fn print_dataset(dataset: Box<dyn DataSet>) -> Result<(), Box<dyn Error>> {
    let mut table = Table::new();

    let mut title_cells: Vec<Cell> = Vec::new();

    let is_ignore_timestamp = dataset.is_ignore_timestamp();
    if !is_ignore_timestamp {
        title_cells.push(cell!("Time"));
    }

    dataset
        .get_column_names()
        .iter()
        .for_each(|name| title_cells.push(cell!(name)));

    table.set_titles(Row::new(title_cells));

    dataset.for_each(|record| {
        let mut row_cells: Vec<Cell> = Vec::new();
        if !is_ignore_timestamp {
            row_cells.push(cell!(record.timestamp.to_string()));
        }

        record
            .values
            .iter()
            .for_each(|v| row_cells.push(cell!(v.to_string())));

        table.add_row(Row::new(row_cells));
    });
    table.printstd();
    Ok(())
}

fn main() -> Result<(), Box<dyn Error>> {
    let config = Config {
        host: String::from("127.0.0.1"),
        port: 6667,
        username: String::from("root"),
        password: String::from("root"),
        ..Default::default()
    };

    //rpc session
    let session = RpcSession::new(&config)?;
    run_example(session)?;

    //Local filesystem session
    // let session = DirectSession::new("/data/apache-iotdb-0.12.3-server-bin");
    // run_example(session)?;

    Ok(())
}

fn run_example<T: Session>(mut session: T) -> Result<(), Box<dyn Error>> {
    session.open()?;

    let tz = session.get_time_zone()?;
    if tz != "Asia/Shanghai" {
        session.set_time_zone("Asia/Shanghai")?;
    }

    session.set_storage_group("root.ln1")?;
    session.delete_storage_group("root.ln1")?;

    session.set_storage_group("root.ln1")?;
    session.set_storage_group("root.ln2")?;
    session.delete_storage_groups(vec!["root.ln1", "root.ln2"])?;

    //create_timeseries
    session.create_timeseries(
        "root.sg1.dev2.status",
        TSDataType::Float,
        TSEncoding::Plain,
        TSCompressionType::SNAPPY,
        None,
        None,
        None,
        None,
    )?;
    session.delete_timeseries(vec!["root.sg1.dev2.status"])?;

    //insert_record
    session.insert_record(
        "root.sg1.dev5",
        vec!["online", "desc"],
        vec![Value::Bool(false), Value::Text("F4145".to_string())],
        Local::now().timestamp_millis(),
        false,
    )?;
    session.delete_timeseries(vec!["root.sg1.dev5.online", "root.sg1.dev5.desc"])?;

    //insert_records
    session.insert_records(
        vec!["root.sg1.dev1"],
        vec![vec![
            "restart_count",
            "tick_count",
            "price",
            "temperature",
            "description",
            "status",
        ]],
        vec![vec![
            Value::Int32(1),
            Value::Int64(2018),
            Value::Double(1988.1),
            Value::Float(12.1),
            Value::Text("Test Device 1".to_string()),
            Value::Bool(false),
        ]],
        vec![Local::now().timestamp_millis()],
    )?;
    session.delete_timeseries(vec![
        "root.sg1.dev1.restart_count",
        "root.sg1.dev1.tick_count",
        "root.sg1.dev1.price",
        "root.sg1.dev1.temperature",
        "root.sg1.dev1.description",
        "root.sg1.dev1.status",
    ])?;

    //create_multi_timeseries
    session.create_multi_timeseries(
        vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"],
        vec![TSDataType::Float, TSDataType::Text],
        vec![TSEncoding::Plain, TSEncoding::Plain],
        vec![TSCompressionType::SNAPPY, TSCompressionType::SNAPPY],
        None,
        None,
        None,
        None,
    )?;
    session.delete_timeseries(vec!["root.sg3.dev1.temperature", "root.sg3.dev1.desc"])?;

    //delete_timeseries
    session.insert_string_record(
        "root.ln.wf02.wt02",
        vec!["id", "location"],
        vec!["SN:001", "Beijing"],
        Local::now().timestamp_millis(),
        false,
    )?;
    session.delete_timeseries(vec!["root.ln.wf02.wt02.id", "root.ln.wf02.wt02.location"])?;

    //insert_records_of_one_device
    session.insert_records_of_one_device(
        "root.sg1.dev0",
        vec![
            Local::now().timestamp_millis(),
            Local::now().timestamp_millis() + 1,
        ],
        vec![
            vec!["restart_count", "tick_count", "price"],
            vec!["temperature", "description", "status"],
        ],
        vec![
            vec![Value::Int32(1), Value::Int64(2018), Value::Double(1988.1)],
            vec![
                Value::Float(12.1),
                Value::Text("Test Device 1".to_string()),
                Value::Bool(false),
            ],
        ],
        false,
    )?;

    //tablet
    let mut ts = Local::now().timestamp_millis();

    let tablet1 = create_tablet(5, ts);
    ts += 5;

    let tablet2 = create_tablet(10, ts);
    ts += 10;

    let tablet3 = create_tablet(2, ts);

    session.insert_tablet(&tablet1, true)?;
    session.insert_tablets(vec![&tablet2, &tablet3], true)?;

    //delete_data
    session.insert_records_of_one_device(
        "root.sg1.dev1",
        vec![1, 16],
        vec![vec!["status"], vec!["status"]],
        vec![vec![Value::Bool(true)], vec![Value::Bool(true)]],
        true,
    )?;
    session.delete_data(vec!["root.sg1.dev1.status"], 1, 16)?;

    let dataset = session.execute_query_statement("select * from root.ln.device2", None)?;
    print_dataset(dataset)?;
    // dataset.for_each(|r| println!("timestamp: {} {:?}", r.timestamp, r.values));
    // let timestamps: Vec<i64> = dataset.map(|r| r.timestamp).collect();
    // let count = dataset.count();

    let ds = session.execute_statement("show timeseries", None)?;
    print_dataset(ds)?;

    session.execute_batch_statement(vec![
        "insert into root.sg1.dev6(time,s5) values(1,true)",
        "insert into root.sg1.dev6(time,s5) values(2,true)",
        "insert into root.sg1.dev6(time,s5) values(3,true)",
    ])?;

    let ds = session.execute_raw_data_query(
        vec![
            "root.ln.device2.restart_count",
            "root.ln.device2.tick_count",
            "root.ln.device2.description",
        ],
        0,
        i64::MAX,
    )?;
    print_dataset(ds)?;

    if let Some(dataset) = session.execute_update_statement("delete timeseries root.sg1.dev1.*")? {
        print_dataset(dataset)?;
    }

    session.close()?;

    Ok(())
}

fn create_tablet(row_count: i32, start_timestamp: i64) -> Tablet {
    let mut tablet = Tablet::new(
        "root.ln.device2",
        vec![
            MeasurementSchema::new(
                String::from("status"),
                TSDataType::Boolean,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("restart_count"),
                TSDataType::Int32,
                TSEncoding::RLE,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("tick_count"),
                TSDataType::Int64,
                TSEncoding::RLE,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("temperature"),
                TSDataType::Float,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("price"),
                TSDataType::Double,
                TSEncoding::Gorilla,
                TSCompressionType::SNAPPY,
                None,
            ),
            MeasurementSchema::new(
                String::from("description"),
                TSDataType::Text,
                TSEncoding::Plain,
                TSCompressionType::SNAPPY,
                None,
            ),
        ],
    );
    (0..row_count).for_each(|row| {
        let ts = start_timestamp + row as i64;
        tablet.add_row(
            vec![
                Value::Bool(ts % 2 == 0),
                Value::Int32(row),
                Value::Int64(row as i64),
                Value::Float(row as f32 + 0.1),
                Value::Double(row as f64 + 0.2),
                Value::Text(format!("ts: {}", ts).to_string()),
            ],
            ts,
        );
    });
    tablet
}