#[cfg(any(feature = "fanout", test))]
use osproxy_engine::{QueueError, QueuedWrite};
#[cfg(any(feature = "fanout", test))]
use osproxy_sink::{DocOp, WriteOp};
#[cfg(any(feature = "fanout", test))]
mod pb {
#![allow(
clippy::doc_markdown,
clippy::large_enum_variant,
clippy::trivially_copy_pass_by_ref,
missing_docs,
unreachable_pub
)]
include!(concat!(env!("OUT_DIR"), "/osproxy.fanout.v1.rs"));
}
#[cfg(any(feature = "fanout", test))]
pub(crate) use pb::{OpEnvelope, OpType};
#[cfg(any(feature = "fanout", test))]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum BodyEncoding {
#[default]
Cbor,
Json,
}
#[cfg(any(feature = "fanout", test))]
fn encode_body(json: &[u8], encoding: BodyEncoding) -> Result<(Vec<u8>, &'static str), QueueError> {
match encoding {
BodyEncoding::Json => Ok((json.to_vec(), "application/json")),
BodyEncoding::Cbor => {
let value: serde_json::Value =
serde_json::from_slice(json).map_err(|_| QueueError {
reason: "fan-out body is not valid JSON",
})?;
let mut out = Vec::new();
ciborium::into_writer(&value, &mut out).map_err(|_| QueueError {
reason: "fan-out body CBOR encoding failed",
})?;
Ok((out, "application/cbor"))
}
}
}
#[cfg(any(feature = "fanout", test))]
pub(crate) fn envelope(
write: &QueuedWrite,
op: &WriteOp,
encoding: BodyEncoding,
) -> Result<OpEnvelope, QueueError> {
let (op_type, id, routing, body) = match &op.doc {
DocOp::Index { id, routing, body } => {
(OpType::Index, id.clone(), routing.clone(), Some(body))
}
DocOp::Create { id, routing, body } => {
(OpType::Create, id.clone(), routing.clone(), Some(body))
}
DocOp::Update { id, routing, body } => (
OpType::Update,
Some(id.clone()),
routing.clone(),
Some(body),
),
DocOp::Delete { id, routing } => (OpType::Delete, Some(id.clone()), routing.clone(), None),
};
let (body, content_type) = match body {
Some(json) => {
let (bytes, ct) = encode_body(json, encoding)?;
(bytes, ct.to_owned())
}
None => (Vec::new(), String::new()),
};
Ok(OpEnvelope {
op_id: write.op_id.clone(),
partition: write.partition_key.clone(),
cluster: op.target.cluster.as_str().to_owned(),
index: op.target.index.as_str().to_owned(),
epoch: op.epoch.get(),
op_type: op_type as i32,
id: id.unwrap_or_default(),
routing: routing.unwrap_or_default(),
content_type,
body,
})
}
#[cfg(feature = "fanout")]
pub(crate) struct KafkaWriteQueue<P> {
producer: std::sync::Arc<P>,
topic: String,
encoding: BodyEncoding,
}
#[cfg(feature = "fanout")]
impl<P> KafkaWriteQueue<P> {
pub(crate) fn new(producer: std::sync::Arc<P>, topic: String, encoding: BodyEncoding) -> Self {
Self {
producer,
topic,
encoding,
}
}
}
#[cfg(feature = "fanout")]
impl<P: osproxy_kafka::AckProducer> osproxy_engine::WriteQueue for KafkaWriteQueue<P> {
fn enabled(&self) -> bool {
true
}
fn enqueue<'a>(
&'a self,
write: QueuedWrite,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), QueueError>> + Send + 'a>>
{
Box::pin(async move {
use prost::Message;
let key = write.partition_key.clone().into_bytes();
for op in write.batch.ops() {
let payload = envelope(&write, op, self.encoding)?.encode_to_vec();
self.producer
.send_acked(&self.topic, &key, &payload)
.await
.map_err(|_| QueueError {
reason: "fan-out enqueue was not acknowledged",
})?;
}
Ok(())
})
}
}
#[cfg(feature = "fanout")]
pub(crate) async fn attach<R, S>(
pipeline: osproxy_engine::Pipeline<R, S>,
cfg: &osproxy_config::Config,
) -> Result<osproxy_engine::Pipeline<R, S>, String>
where
R: osproxy_tenancy::Router,
S: osproxy_sink::Sink + osproxy_sink::Reader,
{
use osproxy_engine::WriteMode;
use osproxy_kafka_krafka::{AuthConfig, KrafkaProducer, TlsConfig as KafkaTlsConfig};
let Some(fc) = &cfg.fanout else {
return Ok(pipeline);
};
let auth = fc.tls.as_ref().map(|tls| {
let mut t = KafkaTlsConfig::new()
.with_ca_cert(&tls.ca_path)
.with_kafka_alpn();
if let (Some(cert), Some(key)) = (&tls.client_cert_path, &tls.client_key_path) {
t = t.with_client_cert(cert, key);
}
AuthConfig::ssl(t)
});
let producer = KrafkaProducer::connect(fc.brokers.clone(), "osproxy-fanout", auth)
.await
.map_err(|e| format!("connecting fan-out producer: {}", e.reason))?;
let encoding = match fc.body_encoding {
osproxy_config::FanoutBodyEncoding::Cbor => BodyEncoding::Cbor,
osproxy_config::FanoutBodyEncoding::Json => BodyEncoding::Json,
};
let mode = if fc.async_default {
WriteMode::Async
} else {
WriteMode::Sync
};
println!(
"osproxy fanout: on (topic={}, brokers={}, tls={}, body={:?}, default={:?}, dbq_expand={})",
fc.topic,
fc.brokers.len(),
fc.tls.is_some(),
fc.body_encoding,
mode,
fc.expand_delete_by_query,
);
let queue = KafkaWriteQueue::new(std::sync::Arc::new(producer), fc.topic.clone(), encoding);
Ok(pipeline
.with_write_queue(std::sync::Arc::new(queue))
.with_baseline_write_mode(mode)
.with_delete_by_query_expansion(fc.expand_delete_by_query))
}
#[cfg(not(feature = "fanout"))]
#[allow(clippy::unused_async)]
pub(crate) async fn attach<R, S>(
pipeline: osproxy_engine::Pipeline<R, S>,
cfg: &osproxy_config::Config,
) -> Result<osproxy_engine::Pipeline<R, S>, String>
where
R: osproxy_tenancy::Router,
S: osproxy_sink::Sink + osproxy_sink::Reader,
{
if cfg.fanout.is_some() {
return Err(
"fan-out is configured (fanout_kafka_brokers/fanout_topic) but this binary \
was built without the `fanout` feature; rebuild with \
`--features fanout`"
.to_owned(),
);
}
Ok(pipeline)
}
#[cfg(test)]
mod tests {
use super::*;
use osproxy_core::{ClusterId, Epoch, IndexName, Target};
use osproxy_sink::WriteBatch;
use prost::Message;
fn write(doc: DocOp) -> QueuedWrite {
let op = WriteOp::new(
Target::new(ClusterId::from("eu-1"), IndexName::from("shared")),
doc,
Epoch::new(4),
);
QueuedWrite {
op_id: "op-1".to_owned(),
partition_key: "acme".to_owned(),
batch: WriteBatch::single(op),
}
}
#[test]
fn cbor_envelope_round_trips_metadata_and_body() {
let json = br#"{"_tenant":"acme","id":7,"msg":"hi"}"#;
let w = write(DocOp::Index {
id: Some("acme:7".to_owned()),
routing: Some("acme".to_owned()),
body: bytes::Bytes::from_static(json),
});
let env = envelope(&w, &w.batch.ops()[0], BodyEncoding::Cbor).unwrap();
let decoded = OpEnvelope::decode(env.encode_to_vec().as_slice()).unwrap();
assert_eq!(decoded.op_id, "op-1");
assert_eq!(decoded.partition, "acme");
assert_eq!(decoded.cluster, "eu-1");
assert_eq!(decoded.index, "shared");
assert_eq!(decoded.epoch, 4);
assert_eq!(decoded.op_type, OpType::Index as i32);
assert_eq!(decoded.id, "acme:7");
assert_eq!(decoded.routing, "acme");
assert_eq!(decoded.content_type, "application/cbor");
let value: serde_json::Value = ciborium::from_reader(decoded.body.as_slice()).unwrap();
assert_eq!(
value,
serde_json::json!({"_tenant":"acme","id":7,"msg":"hi"})
);
}
#[test]
fn json_encoding_keeps_the_body_verbatim() {
let json = br#"{"id":1}"#;
let w = write(DocOp::Index {
id: None,
routing: None,
body: bytes::Bytes::from_static(json),
});
let env = envelope(&w, &w.batch.ops()[0], BodyEncoding::Json).unwrap();
assert_eq!(env.content_type, "application/json");
assert_eq!(env.body, json.to_vec());
assert_eq!(env.id, ""); }
#[test]
fn delete_envelope_carries_no_body() {
let w = write(DocOp::Delete {
id: "acme:7".to_owned(),
routing: Some("acme".to_owned()),
});
let env = envelope(&w, &w.batch.ops()[0], BodyEncoding::Cbor).unwrap();
assert_eq!(env.op_type, OpType::Delete as i32);
assert_eq!(env.id, "acme:7");
assert!(env.body.is_empty());
assert_eq!(env.content_type, "");
}
}
#[cfg(all(test, feature = "fanout"))]
#[path = "fanout_kafka_test.rs"]
mod kafka_round_trip;