use ella_common::{Duration, Time};
use ella_engine as engine;
use ella_tensor as tensor;
use engine::{
config::{EllaConfig, EngineConfig},
table::{info::TopicBuilder, ColumnBuilder},
};
use futures::SinkExt;
use opentelemetry::{
sdk::{trace, Resource},
KeyValue,
};
use tensor::{Tensor, Tensor1, TensorType};
use tokio_stream::StreamExt;
use tracing_subscriber::{
prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, Layer,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_trace_config(
trace::config()
.with_resource(Resource::new(vec![KeyValue::new("service.name", "ella")])),
)
.install_batch(opentelemetry::runtime::Tokio)?;
tracing_subscriber::registry()
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.with(
tracing_subscriber::fmt::layer()
.with_filter(tracing_subscriber::EnvFilter::new("INFO")),
)
.init();
let config = EllaConfig::builder()
.engine_config(EngineConfig::builder().serve_metrics("0.0.0.0:8888"))
.build();
let ctx = engine::create("file:///tmp/ella/", config, true).await?;
let topic = TopicBuilder::new()
.column(ColumnBuilder::new("i", TensorType::Int32))
.column(ColumnBuilder::new("dt", TensorType::Duration))
.column(ColumnBuilder::new("x", TensorType::Float32).row_shape((512,)))
.column(ColumnBuilder::new("y", TensorType::String).row_shape((2, 2)));
let pb = ctx
.create_topic("point", topic, true, false)
.await?
.publish();
let mut sink = pb.rows(1)?;
let start = ella_common::now();
let end = start + Duration::seconds(2);
let mut i = 0_i32;
while ella_common::now() < end {
i += 1;
sink.feed((
ella_common::now(),
i,
Duration::milliseconds(50),
Tensor::linspace(i as f32, (i + 1) as f32, 512),
tensor::tensor![
["A".to_string(), "B".to_string()],
["C".to_string(), "D".to_string()]
],
))
.await?;
}
sink.close().await?;
let mut rows = ctx
.query("SELECT * FROM point ORDER BY time")
.await?
.rows::<(Time, i32, Duration, Tensor1<f32>, Tensor1<String>)>()
.await?;
while let Some(row) = rows.try_next().await? {
println!("{:?}", row);
}
ctx.shutdown().await?;
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}