use std::time::Duration;
use chrono::{Duration as ChronoDuration, Utc};
use futures::TryStreamExt;
use ticksupply::resources::export_schemas::{
DataType, ExchangeExtractor, MetaExtraction, MetaValue, SchemaColumn, SchemaContent,
StreamCategory, TimestampFormat,
};
use ticksupply::resources::exports::ExportStatus;
use ticksupply::Client;
#[tokio::main]
async fn main() -> ticksupply::Result<()> {
let client = Client::new()?;
println!("== exchanges ==");
let exchanges = client.exchanges().list().await?;
for ex in &exchanges {
println!(" {:<15} {}", ex.code, ex.display_name);
}
println!("\n== binance instruments matching BTC (first 5) ==");
let page = client
.exchanges()
.list_instruments("binance")
.search("BTC")
.limit(5)
.send()
.await?;
for inst in &page.items {
println!(
" {:<12} {}/{} type={}",
inst.symbol,
inst.base.as_deref().unwrap_or("?"),
inst.quote.as_deref().unwrap_or("?"),
inst.instrument_type.as_deref().unwrap_or("?"),
);
}
println!(" total matching: {:?}", page.total);
println!("\n== binance BTCUSDT trades datastream ==");
let ds_page = client
.datastreams()
.list()
.exchange("binance")
.instrument("BTCUSDT")
.stream_type("trades")
.limit(5)
.send()
.await?;
for ds in &ds_page.items {
println!(
" id={} {} {} {} wire={}",
ds.datastream_id, ds.exchange, ds.instrument, ds.stream_type, ds.wire_format
);
}
let Some(ds) = ds_page.items.first() else {
println!(" (none found — skipping availability + subscription lifecycle)");
return Ok(());
};
let datastream_id = ds.datastream_id;
println!("\n== availability for datastream {datastream_id} ==");
let avail = client.availability().get(datastream_id).await?;
println!(
" {} / {} / {}",
avail.datastream.exchange, avail.datastream.instrument, avail.datastream.stream_type
);
for r in &avail.ranges {
println!(
" range: {} → {} ~{} rows",
r.from_ns.as_i64(),
r.to_ns.as_i64(),
r.rows_estimate
);
}
println!("\n== billing summary ==");
let s = client.billing().summary().await?;
println!(" access_status: {:?}", s.access_status);
println!(" plan_code: {:?}", s.plan_code);
println!(" period_end: {:?}", s.current_period_end);
println!(
" usage: extra_stream_minutes={}, export_gb_total={}, export_gb_extra={}",
s.usage.extra_stream_minutes_total, s.usage.export_gb_total, s.usage.export_gb_extra
);
println!("\n== subscription lifecycle (create → pause → resume → delete) ==");
let stream = client.subscriptions().list().limit(50).stream();
tokio::pin!(stream);
while let Some(existing) = stream.try_next().await? {
if existing.datastream_id != datastream_id {
continue;
}
println!(" sweeping {} ({:?})", existing.id, existing.status);
client
.subscriptions()
.delete(&existing.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
}
let idem = uuid::Uuid::new_v4().to_string();
let sub = client
.subscriptions()
.create(datastream_id)
.idempotency_key(&idem)
.send()
.await?;
println!(" created: {} status={:?}", sub.id, sub.status);
client
.subscriptions()
.pause(&sub.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
let paused = client.subscriptions().get(&sub.id).await?;
println!(" paused: {} status={:?}", paused.id, paused.status);
client
.subscriptions()
.resume(&sub.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
let resumed = client.subscriptions().get(&sub.id).await?;
println!(" resumed: {} status={:?}", resumed.id, resumed.status);
client
.subscriptions()
.delete(&sub.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(" deleted: {}", sub.id);
println!("\n== listing first 3 subscriptions via stream ==");
let stream = client.subscriptions().list().limit(3).stream();
tokio::pin!(stream);
let mut n = 0;
while let Some(s) = stream.try_next().await? {
n += 1;
println!(" {}: {} {:?}", n, s.id, s.status);
if n >= 3 {
break;
}
}
println!("\n== export schemas list ==");
let schemas = client.export_schemas().list().await?;
println!(" {} schema(s)", schemas.len());
for s in schemas.iter().take(5) {
println!(
" - {:<20} {:<20} v{} built_in={} has_draft={}",
s.id, s.name, s.version, s.is_built_in, s.has_draft
);
}
println!("\n== export-schema lifecycle (create → draft → update → publish → delete) ==");
let schema_name = format!("smoke-{}", uuid::Uuid::new_v4().simple());
let v1 = SchemaContent::builder()
.column(SchemaColumn::meta(
"ts",
MetaExtraction::new(MetaValue::CollectionTimestampNs).format(TimestampFormat::Iso8601),
))
.column(SchemaColumn::data("price").exchange(
"binance",
ExchangeExtractor::json("data.p", DataType::Decimal(18)),
))
.build();
let created = client
.export_schemas()
.create(&schema_name, StreamCategory::Trade, v1)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(
" created: {} {} v{} (built_in={})",
created.id, created.name, created.version, created.is_built_in
);
let h = client.export_schemas().for_id(&created.id);
h.create_draft()
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
let draft = h.get_draft().await?;
println!(
" draft created (copy of v{}, has_draft={})",
draft.version, draft.has_draft
);
let v2 = SchemaContent::builder()
.column(SchemaColumn::meta(
"ts",
MetaExtraction::new(MetaValue::CollectionTimestampNs).format(TimestampFormat::Iso8601),
))
.column(SchemaColumn::data("price").exchange(
"binance",
ExchangeExtractor::json("data.p", DataType::Decimal(18)),
))
.column(SchemaColumn::data("qty").exchange(
"binance",
ExchangeExtractor::json("data.q", DataType::Decimal(18)),
))
.build();
h.update_draft(v2).send().await?;
let updated_draft = h.get_draft().await?;
println!(" draft updated: {} columns", updated_draft.columns.len());
let published = h
.publish_draft()
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(
" published: v{} has_draft={} columns={}",
published.version,
published.has_draft,
published.columns.len()
);
h.delete()
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(" deleted: {}", created.id);
println!("\n== export lifecycle (create → poll → download → delete) ==");
let sub_for_export = client
.subscriptions()
.create(datastream_id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(" subscription: {}", sub_for_export.id);
let now = Utc::now();
let export_end = now - ChronoDuration::seconds(60);
let export_start = export_end - ChronoDuration::minutes(5);
let job = client
.exports()
.create(datastream_id, export_start, export_end)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.schema("raw")
.send()
.await?;
println!(" created: {} status={:?}", job.id, job.status);
let mut final_job = job.clone();
for _ in 0..24 {
let cur = client.exports().get(&job.id).await?;
println!(" polling: status={:?}", cur.status);
match cur.status {
ExportStatus::Succeeded | ExportStatus::Failed | ExportStatus::Canceled => {
final_job = cur;
break;
}
_ => tokio::time::sleep(Duration::from_secs(5)).await,
}
}
match final_job.status {
ExportStatus::Succeeded => {
let d = client.exports().get_download(&job.id).await?;
println!(
" download: {} artifact(s), total {} bytes",
d.count, d.total_bytes
);
for a in d.artifacts.iter().take(3) {
println!(" {} ({} bytes)", a.filename, a.bytes);
}
}
other => println!(" finished with {:?}: reason={:?}", other, final_job.reason),
}
client
.exports()
.delete(&job.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(" deleted export: {}", job.id);
client
.subscriptions()
.delete(&sub_for_export.id)
.idempotency_key(uuid::Uuid::new_v4().to_string())
.send()
.await?;
println!(" deleted subscription: {}", sub_for_export.id);
println!("\nall good");
Ok(())
}