use tracing::debug;
use super::session::SyncSession;
use super::wire::*;
#[derive(Debug)]
pub struct TimeseriesIngestData {
pub collection: String,
pub ilp_payload: Vec<u8>,
pub lite_id: String,
pub sample_count: u64,
}
impl SyncSession {
pub fn handle_timeseries_push(
&mut self,
msg: &TimeseriesPushMsg,
) -> (SyncFrame, Option<TimeseriesIngestData>) {
self.last_activity = std::time::Instant::now();
if !self.authenticated {
let ack = TimeseriesAckMsg {
collection: msg.collection.clone(),
accepted: 0,
rejected: msg.sample_count,
lsn: 0,
};
return (
SyncFrame::encode_or_empty(SyncMessageType::TimeseriesAck, &ack),
None,
);
}
let timestamps = nodedb_codec::GorillaDecoder::new(&msg.ts_block).decode_all();
let values = nodedb_codec::GorillaDecoder::new(&msg.val_block).decode_all();
let decoded_count = timestamps.len().min(values.len());
if decoded_count == 0 {
let ack = TimeseriesAckMsg {
collection: msg.collection.clone(),
accepted: 0,
rejected: msg.sample_count,
lsn: 0,
};
return (
SyncFrame::encode_or_empty(SyncMessageType::TimeseriesAck, &ack),
None,
);
}
self.mutations_processed += decoded_count as u64;
let mut ilp_lines = String::with_capacity(decoded_count * 80);
for i in 0..decoded_count {
let (ts, _) = timestamps[i];
let (_, val) = values[i];
ilp_lines.push_str(&msg.collection);
ilp_lines.push_str(",__source=");
ilp_lines.push_str(&msg.lite_id);
ilp_lines.push_str(" value=");
ilp_lines.push_str(&val.to_string());
ilp_lines.push(' ');
ilp_lines.push_str(&(ts * 1_000_000).to_string());
ilp_lines.push('\n');
}
debug!(
session = %self.session_id,
collection = %msg.collection,
decoded = decoded_count,
lite_id = %msg.lite_id,
"timeseries push decoded, dispatching to Data Plane"
);
let ack = TimeseriesAckMsg {
collection: msg.collection.clone(),
accepted: decoded_count as u64,
rejected: msg.sample_count.saturating_sub(decoded_count as u64),
lsn: self.mutations_processed,
};
let ingest = TimeseriesIngestData {
collection: msg.collection.clone(),
ilp_payload: ilp_lines.into_bytes(),
lite_id: msg.lite_id.clone(),
sample_count: decoded_count as u64,
};
(
SyncFrame::encode_or_empty(SyncMessageType::TimeseriesAck, &ack),
Some(ingest),
)
}
}