osproxy_sink/sink.rs
1//! The [`Sink`] trait: where writes go, decoupled from how routing is decided.
2
3use crate::ack::WriteAck;
4use crate::batch::WriteBatch;
5use crate::error::SinkError;
6
7/// Where writes go.
8///
9/// Isolating the destination behind a trait keeps routing decisions independent
10/// of delivery (`docs/decisions/008`): `OpenSearchSink` writes directly to a
11/// cluster today, and a future `QueueSink` (Kafka) can take the *same*
12/// [`WriteBatch`] for the redundancy mode with no change to the engine.
13///
14/// # Invariants
15///
16/// - MUST NOT panic; return [`SinkError`] for every failure (NFR-R1).
17/// - The returned [`WriteAck`] MUST carry one result per batch operation, in the
18/// batch's original order, so a bulk response can be re-interleaved (M3).
19///
20/// # Examples
21///
22/// ```
23/// use osproxy_core::{ClusterId, Epoch, IndexName, Target};
24/// use osproxy_sink::{DocOp, MemorySink, Sink, WriteBatch, WriteOp};
25///
26/// # async fn demo() {
27/// let sink = MemorySink::new();
28/// let op = WriteOp::new(
29/// Target::new(ClusterId::from("c"), IndexName::from("i")),
30/// DocOp::Index { id: Some("p:1".into()), routing: Some("p".into()), body: bytes::Bytes::from_static(b"{}") },
31/// Epoch::new(1),
32/// );
33/// let ack = sink.write(WriteBatch::single(op)).await.unwrap();
34/// assert!(ack.all_succeeded());
35/// assert_eq!(sink.recorded().len(), 1);
36/// # }
37/// ```
38pub trait Sink: Send + Sync {
39 /// Applies a batch of writes, returning a per-operation acknowledgement.
40 ///
41 /// # Errors
42 ///
43 /// Returns [`SinkError`] if the upstream rejects the whole request, the
44 /// write cannot be delivered, or the epoch is stale (M5).
45 fn write(
46 &self,
47 batch: WriteBatch,
48 ) -> impl std::future::Future<Output = Result<WriteAck, SinkError>> + Send;
49}
50
51#[cfg(test)]
52mod tests {
53 use super::*;
54 use crate::batch::{DocOp, WriteOp};
55 use crate::MemorySink;
56 use osproxy_core::{ClusterId, Epoch, IndexName, Target};
57
58 #[tokio::test]
59 async fn memory_sink_records_and_acks() {
60 let sink = MemorySink::new();
61 let op = WriteOp::new(
62 Target::new(ClusterId::from("c"), IndexName::from("i")),
63 DocOp::Index {
64 id: Some("p:1".to_owned()),
65 routing: Some("p".to_owned()),
66 body: bytes::Bytes::from_static(b"{}"),
67 },
68 Epoch::new(3),
69 );
70 let ack = sink.write(WriteBatch::single(op)).await.unwrap();
71 assert!(ack.all_succeeded());
72 assert_eq!(ack.results()[0].id, "p:1");
73 assert_eq!(sink.recorded().len(), 1);
74 assert_eq!(sink.recorded()[0].ops()[0].epoch, Epoch::new(3));
75 }
76}