use crate::{
api::{AttrVal, Nanoseconds, TimelineId},
ingest_client::{
dynamic::{DynamicIngestClient, DynamicIngestError},
IngestClient, IngestStatus, ReadyState,
},
ingest_protocol::InternedAttrKey,
};
use std::{collections::BTreeMap, time::SystemTime};
pub use super::config::Config;
pub struct Client {
inner: DynamicIngestClient,
run_id: Option<String>,
time_domain: Option<String>,
timeline_keys: BTreeMap<String, InternedAttrKey>,
event_keys: BTreeMap<String, InternedAttrKey>,
additional_timeline_attributes: Vec<(InternedAttrKey, AttrVal)>,
override_timeline_attributes: Vec<(InternedAttrKey, AttrVal)>,
enable_auto_timestamp: bool,
}
impl Client {
pub async fn new(
client: IngestClient<ReadyState>,
timeline_attr_cfg: crate::reflector_config::TimelineAttributes,
run_id: Option<String>,
time_domain: Option<String>,
) -> Result<Self, DynamicIngestError> {
let mut client = Self {
inner: client.into(),
run_id,
time_domain,
timeline_keys: Default::default(),
event_keys: Default::default(),
additional_timeline_attributes: Default::default(),
override_timeline_attributes: Default::default(),
enable_auto_timestamp: true,
};
for kvp in timeline_attr_cfg.additional_timeline_attributes.into_iter() {
let k = client.prep_timeline_attr(kvp.0.as_ref()).await?;
client.additional_timeline_attributes.push((k, kvp.1));
}
for kvp in timeline_attr_cfg.override_timeline_attributes.into_iter() {
let k = client.prep_timeline_attr(kvp.0.as_ref()).await?;
client.override_timeline_attributes.push((k, kvp.1));
}
Ok(client)
}
pub fn disable_auto_timestamp(&mut self) {
self.enable_auto_timestamp = false;
}
pub async fn switch_timeline(&mut self, id: TimelineId) -> Result<(), DynamicIngestError> {
self.inner.open_timeline(id).await?;
Ok(())
}
pub async fn send_timeline_attrs(
&mut self,
name: &str,
timeline_attrs: impl IntoIterator<Item = (&str, AttrVal)>,
) -> Result<(), DynamicIngestError> {
let mut interned_attrs =
vec![(self.prep_timeline_attr("timeline.name").await?, name.into())];
if let Some(run_id) = self.run_id.clone() {
let k = self.prep_timeline_attr("timeline.run_id").await?;
interned_attrs.push((k, AttrVal::String(run_id.into())));
}
if let Some(time_domain) = self.time_domain.clone() {
let k = self.prep_timeline_attr("timeline.time_domain").await?;
interned_attrs.push((k, AttrVal::String(time_domain.into())));
}
interned_attrs.extend(self.additional_timeline_attributes.iter().cloned());
interned_attrs.extend(self.override_timeline_attributes.iter().cloned());
for (k, v) in timeline_attrs {
let k = self.prep_timeline_attr(k).await?;
if self
.override_timeline_attributes
.iter()
.any(|(ko, _)| k == *ko)
{
continue;
}
interned_attrs.push((k, v));
}
self.inner.timeline_metadata(interned_attrs).await?;
Ok(())
}
async fn prep_timeline_attr(&mut self, k: &str) -> Result<InternedAttrKey, DynamicIngestError> {
let key = normalize_timeline_key(k);
let int_key = if let Some(ik) = self.timeline_keys.get(&key) {
*ik
} else {
let ik = self.inner.declare_attr_key(key.clone()).await?;
self.timeline_keys.insert(key, ik);
ik
};
Ok(int_key)
}
pub async fn send_event(
&mut self,
name: &str,
ordering: u128,
attrs: impl IntoIterator<Item = (&str, AttrVal)>,
) -> Result<(), DynamicIngestError> {
let mut interned_attrs = Vec::new();
let mut have_timestamp = false;
interned_attrs.push((self.prep_event_attr("event.name").await?, name.into()));
for (k, v) in attrs {
if self.enable_auto_timestamp && (k == "timestamp" || k == "event.timestamp") {
have_timestamp = true;
}
interned_attrs.push((self.prep_event_attr(k).await?, v));
}
if self.enable_auto_timestamp && !have_timestamp {
interned_attrs.push((
self.prep_event_attr("event.timestamp").await?,
Nanoseconds::from(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64,
)
.into(),
));
}
self.inner.event(ordering, interned_attrs).await?;
Ok(())
}
pub async fn flush(&mut self) -> Result<(), DynamicIngestError> {
self.inner.flush().await?;
Ok(())
}
pub async fn status(&mut self) -> Result<IngestStatus, DynamicIngestError> {
Ok(self.inner.status().await?)
}
async fn prep_event_attr(&mut self, k: &str) -> Result<InternedAttrKey, DynamicIngestError> {
let key = normalize_event_key(k);
let int_key = if let Some(ik) = self.event_keys.get(&key) {
*ik
} else {
let ik = self.inner.declare_attr_key(key.clone()).await?;
self.timeline_keys.insert(key, ik);
ik
};
Ok(int_key)
}
}
fn normalize_timeline_key(s: &str) -> String {
if s.starts_with("timeline.") {
s.to_owned()
} else {
format!("timeline.{s}")
}
}
fn normalize_event_key(s: &str) -> String {
if s.starts_with("event.") {
s.to_owned()
} else {
format!("event.{s}")
}
}