ticksupply 0.2.0

Official Rust client for the Ticksupply market data API
Documentation
//! Live smoke test: exercise catalog, availability, billing, subscription
//! lifecycle, exports lifecycle, export-schemas list, and the full
//! export-schema CRUD + draft → publish workflow.
//!
//! Run: `TICKSUPPLY_API_KEY=... cargo run --example smoke_test`
//!
//! Creates and deletes a single ephemeral subscription and one ephemeral
//! export job. Safe to run.

use std::time::Duration;

use chrono::{Duration as ChronoDuration, Utc};
use futures::TryStreamExt;
use ticksupply::resources::export_schemas::{
    DataType, ExchangeExtractor, MetaExtraction, MetaValue, SchemaColumn, SchemaContent,
    StreamCategory, TimestampFormat,
};
use ticksupply::resources::exports::ExportStatus;
use ticksupply::Client;

#[tokio::main]
async fn main() -> ticksupply::Result<()> {
    let client = Client::new()?;

    println!("== exchanges ==");
    let exchanges = client.exchanges().list().await?;
    for ex in &exchanges {
        println!("  {:<15} {}", ex.code, ex.display_name);
    }

    println!("\n== binance instruments matching BTC (first 5) ==");
    let page = client
        .exchanges()
        .list_instruments("binance")
        .search("BTC")
        .limit(5)
        .send()
        .await?;
    for inst in &page.items {
        println!(
            "  {:<12} {}/{} type={}",
            inst.symbol,
            inst.base.as_deref().unwrap_or("?"),
            inst.quote.as_deref().unwrap_or("?"),
            inst.instrument_type.as_deref().unwrap_or("?"),
        );
    }
    println!("  total matching: {:?}", page.total);

    println!("\n== binance BTCUSDT trades datastream ==");
    let ds_page = client
        .datastreams()
        .list()
        .exchange("binance")
        .instrument("BTCUSDT")
        .stream_type("trades")
        .limit(5)
        .send()
        .await?;
    for ds in &ds_page.items {
        println!(
            "  id={} {} {} {} wire={}",
            ds.datastream_id, ds.exchange, ds.instrument, ds.stream_type, ds.wire_format
        );
    }
    let Some(ds) = ds_page.items.first() else {
        println!("  (none found — skipping availability + subscription lifecycle)");
        return Ok(());
    };
    let datastream_id = ds.datastream_id;

    println!("\n== availability for datastream {datastream_id} ==");
    let avail = client.availability().get(datastream_id).await?;
    println!(
        "  {} / {} / {}",
        avail.datastream.exchange, avail.datastream.instrument, avail.datastream.stream_type
    );
    for r in &avail.ranges {
        println!(
            "  range: {}{}  ~{} rows",
            r.from_ns.as_i64(),
            r.to_ns.as_i64(),
            r.rows_estimate
        );
    }

    println!("\n== billing summary ==");
    let s = client.billing().summary().await?;
    println!("  access_status: {:?}", s.access_status);
    println!("  plan_code:     {:?}", s.plan_code);
    println!("  period_end:    {:?}", s.current_period_end);
    println!(
        "  usage: extra_stream_minutes={}, export_gb_total={}, export_gb_extra={}",
        s.usage.extra_stream_minutes_total, s.usage.export_gb_total, s.usage.export_gb_extra
    );

    println!("\n== subscription lifecycle (create → pause → resume → delete) ==");
    // Sweep any existing subscription on this datastream so the lifecycle
    // starts clean. The server's list endpoint doesn't filter by
    // datastream_id, so page through and filter client-side.
    let stream = client.subscriptions().list().limit(50).stream();
    tokio::pin!(stream);
    while let Some(existing) = stream.try_next().await? {
        if existing.datastream_id != datastream_id {
            continue;
        }
        println!("  sweeping {} ({:?})", existing.id, existing.status);
        client
            .subscriptions()
            .delete(&existing.id)
            .idempotency_key(uuid::Uuid::new_v4().to_string())
            .send()
            .await?;
    }
    let idem = uuid::Uuid::new_v4().to_string();
    let sub = client
        .subscriptions()
        .create(datastream_id)
        .idempotency_key(&idem)
        .send()
        .await?;
    println!("  created: {} status={:?}", sub.id, sub.status);

    client
        .subscriptions()
        .pause(&sub.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    let paused = client.subscriptions().get(&sub.id).await?;
    println!("  paused:  {} status={:?}", paused.id, paused.status);

    client
        .subscriptions()
        .resume(&sub.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    let resumed = client.subscriptions().get(&sub.id).await?;
    println!("  resumed: {} status={:?}", resumed.id, resumed.status);

    client
        .subscriptions()
        .delete(&sub.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!("  deleted: {}", sub.id);

    println!("\n== listing first 3 subscriptions via stream ==");
    let stream = client.subscriptions().list().limit(3).stream();
    tokio::pin!(stream);
    let mut n = 0;
    while let Some(s) = stream.try_next().await? {
        n += 1;
        println!("  {}: {} {:?}", n, s.id, s.status);
        if n >= 3 {
            break;
        }
    }

    println!("\n== export schemas list ==");
    let schemas = client.export_schemas().list().await?;
    println!("  {} schema(s)", schemas.len());
    for s in schemas.iter().take(5) {
        println!(
            "  - {:<20} {:<20} v{} built_in={} has_draft={}",
            s.id, s.name, s.version, s.is_built_in, s.has_draft
        );
    }

    println!("\n== export-schema lifecycle (create → draft → update → publish → delete) ==");
    let schema_name = format!("smoke-{}", uuid::Uuid::new_v4().simple());
    let v1 = SchemaContent::builder()
        .column(SchemaColumn::meta(
            "ts",
            MetaExtraction::new(MetaValue::CollectionTimestampNs).format(TimestampFormat::Iso8601),
        ))
        .column(SchemaColumn::data("price").exchange(
            "binance",
            ExchangeExtractor::json("data.p", DataType::Decimal(18)),
        ))
        .build();
    let created = client
        .export_schemas()
        .create(&schema_name, StreamCategory::Trade, v1)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!(
        "  created: {} {} v{} (built_in={})",
        created.id, created.name, created.version, created.is_built_in
    );

    let h = client.export_schemas().for_id(&created.id);

    h.create_draft()
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    let draft = h.get_draft().await?;
    println!(
        "  draft created (copy of v{}, has_draft={})",
        draft.version, draft.has_draft
    );

    let v2 = SchemaContent::builder()
        .column(SchemaColumn::meta(
            "ts",
            MetaExtraction::new(MetaValue::CollectionTimestampNs).format(TimestampFormat::Iso8601),
        ))
        .column(SchemaColumn::data("price").exchange(
            "binance",
            ExchangeExtractor::json("data.p", DataType::Decimal(18)),
        ))
        .column(SchemaColumn::data("qty").exchange(
            "binance",
            ExchangeExtractor::json("data.q", DataType::Decimal(18)),
        ))
        .build();
    h.update_draft(v2).send().await?;
    let updated_draft = h.get_draft().await?;
    println!("  draft updated: {} columns", updated_draft.columns.len());

    let published = h
        .publish_draft()
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!(
        "  published: v{} has_draft={} columns={}",
        published.version,
        published.has_draft,
        published.columns.len()
    );

    h.delete()
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!("  deleted: {}", created.id);

    println!("\n== export lifecycle (create → poll → download → delete) ==");
    // Need an active or paused subscription for the export to cover data.
    let sub_for_export = client
        .subscriptions()
        .create(datastream_id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!("  subscription: {}", sub_for_export.id);

    let now = Utc::now();
    let export_end = now - ChronoDuration::seconds(60);
    let export_start = export_end - ChronoDuration::minutes(5);
    let job = client
        .exports()
        .create(datastream_id, export_start, export_end)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .schema("raw")
        .send()
        .await?;
    println!("  created: {} status={:?}", job.id, job.status);

    let mut final_job = job.clone();
    for _ in 0..24 {
        let cur = client.exports().get(&job.id).await?;
        println!("    polling: status={:?}", cur.status);
        match cur.status {
            ExportStatus::Succeeded | ExportStatus::Failed | ExportStatus::Canceled => {
                final_job = cur;
                break;
            }
            _ => tokio::time::sleep(Duration::from_secs(5)).await,
        }
    }

    match final_job.status {
        ExportStatus::Succeeded => {
            let d = client.exports().get_download(&job.id).await?;
            println!(
                "  download: {} artifact(s), total {} bytes",
                d.count, d.total_bytes
            );
            for a in d.artifacts.iter().take(3) {
                println!("    {} ({} bytes)", a.filename, a.bytes);
            }
        }
        other => println!("  finished with {:?}: reason={:?}", other, final_job.reason),
    }

    client
        .exports()
        .delete(&job.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!("  deleted export: {}", job.id);
    client
        .subscriptions()
        .delete(&sub_for_export.id)
        .idempotency_key(uuid::Uuid::new_v4().to_string())
        .send()
        .await?;
    println!("  deleted subscription: {}", sub_for_export.id);

    println!("\nall good");
    Ok(())
}