use arrow::record_batch::RecordBatch;
use jammi_db::trigger::{DeliveredBatch, Offset, TopicDefinition};
use prost_types::Timestamp;
use tonic::Status;
use crate::proto::trigger::{ArrowBatch, SubscribedBatch};
use crate::{decode_ipc_stream, encode_ipc_stream};
pub fn decode_publish_batch(
wire: &ArrowBatch,
topic: &TopicDefinition,
) -> Result<RecordBatch, Status> {
let mut batches = decode_ipc_stream(&wire.data_header, &wire.data_body)?;
let batch = match batches.len() {
1 => batches.pop().expect("len checked == 1"),
0 => {
return Err(Status::invalid_argument(
"batch IPC stream contains no batch",
))
}
n => {
return Err(Status::invalid_argument(format!(
"publish carries exactly one batch, got {n}"
)))
}
};
if batch.schema().as_ref() != topic.schema.as_ref() {
return Err(Status::invalid_argument(
"batch schema does not match topic schema",
));
}
Ok(batch)
}
pub fn encode_delivered_batch(
schema: &arrow_schema::SchemaRef,
delivered: DeliveredBatch,
) -> Result<SubscribedBatch, Status> {
let buf = encode_ipc_stream(schema, std::slice::from_ref(&delivered.batch))?;
Ok(SubscribedBatch {
offset: delivered.offset.value(),
produced_at: Some(to_proto_timestamp(delivered.produced_at)),
batch: Some(ArrowBatch {
data_header: Vec::new(),
data_body: buf,
app_metadata: Vec::new(),
}),
})
}
pub fn to_proto_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
let seconds = dt.timestamp();
let nanos = dt.timestamp_subsec_nanos() as i32;
Timestamp { seconds, nanos }
}
pub fn from_proto_timestamp(ts: &Timestamp) -> Result<chrono::DateTime<chrono::Utc>, Status> {
chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
.ok_or_else(|| Status::invalid_argument("produced_at timestamp out of range"))
}
pub fn encode_publish_batch(batch: &RecordBatch) -> Result<ArrowBatch, Status> {
let buf = encode_ipc_stream(&batch.schema(), std::slice::from_ref(batch))?;
Ok(ArrowBatch {
data_header: Vec::new(),
data_body: buf,
app_metadata: Vec::new(),
})
}
pub fn decode_subscribed_batch(wire: SubscribedBatch) -> Result<DeliveredBatch, Status> {
let arrow = wire
.batch
.ok_or_else(|| Status::invalid_argument("subscribed batch missing payload"))?;
let mut batches = decode_ipc_stream(&arrow.data_header, &arrow.data_body)?;
let batch = match batches.len() {
1 => batches.pop().expect("len checked == 1"),
n => {
return Err(Status::invalid_argument(format!(
"subscribed batch carries exactly one record batch, got {n}"
)))
}
};
let produced_at = wire
.produced_at
.as_ref()
.map(from_proto_timestamp)
.transpose()?
.ok_or_else(|| Status::invalid_argument("subscribed batch missing produced_at"))?;
Ok(DeliveredBatch {
offset: Offset::new(wire.offset, produced_at),
produced_at,
batch,
})
}