Skip to main content

osproxy_sink/
sink.rs

1//! The [`Sink`] trait: where writes go, decoupled from how routing is decided.
2
3use crate::ack::WriteAck;
4use crate::batch::WriteBatch;
5use crate::error::SinkError;
6
7/// Where writes go.
8///
9/// Isolating the destination behind a trait keeps routing decisions independent
10/// of delivery (`docs/decisions/008`): `OpenSearchSink` writes directly to a
11/// cluster today, and a future `QueueSink` (Kafka) can take the *same*
12/// [`WriteBatch`] for the redundancy mode with no change to the engine.
13///
14/// # Invariants
15///
16/// - MUST NOT panic; return [`SinkError`] for every failure (NFR-R1).
17/// - The returned [`WriteAck`] MUST carry one result per batch operation, in the
18///   batch's original order, so a bulk response can be re-interleaved (M3).
19///
20/// # Examples
21///
22/// ```
23/// use osproxy_core::{ClusterId, Epoch, IndexName, Target};
24/// use osproxy_sink::{DocOp, MemorySink, Sink, WriteBatch, WriteOp};
25///
26/// # async fn demo() {
27/// let sink = MemorySink::new();
28/// let op = WriteOp::new(
29///     Target::new(ClusterId::from("c"), IndexName::from("i")),
30///     DocOp::Index { id: Some("p:1".into()), routing: Some("p".into()), body: bytes::Bytes::from_static(b"{}") },
31///     Epoch::new(1),
32/// );
33/// let ack = sink.write(WriteBatch::single(op)).await.unwrap();
34/// assert!(ack.all_succeeded());
35/// assert_eq!(sink.recorded().len(), 1);
36/// # }
37/// ```
38pub trait Sink: Send + Sync {
39    /// Applies a batch of writes, returning a per-operation acknowledgement.
40    ///
41    /// # Errors
42    ///
43    /// Returns [`SinkError`] if the upstream rejects the whole request, the
44    /// write cannot be delivered, or the epoch is stale (M5).
45    fn write(
46        &self,
47        batch: WriteBatch,
48    ) -> impl std::future::Future<Output = Result<WriteAck, SinkError>> + Send;
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use crate::batch::{DocOp, WriteOp};
55    use crate::MemorySink;
56    use osproxy_core::{ClusterId, Epoch, IndexName, Target};
57
58    #[tokio::test]
59    async fn memory_sink_records_and_acks() {
60        let sink = MemorySink::new();
61        let op = WriteOp::new(
62            Target::new(ClusterId::from("c"), IndexName::from("i")),
63            DocOp::Index {
64                id: Some("p:1".to_owned()),
65                routing: Some("p".to_owned()),
66                body: bytes::Bytes::from_static(b"{}"),
67            },
68            Epoch::new(3),
69        );
70        let ack = sink.write(WriteBatch::single(op)).await.unwrap();
71        assert!(ack.all_succeeded());
72        assert_eq!(ack.results()[0].id, "p:1");
73        assert_eq!(sink.recorded().len(), 1);
74        assert_eq!(sink.recorded()[0].ops()[0].epoch, Epoch::new(3));
75    }
76}