Skip to main content

osproxy_sink/
ack.rs

1//! The result of applying a [`WriteBatch`](crate::WriteBatch) at a sink.
2
3/// The outcome of a single operation in a batch, positionally aligned with the
4/// batch's operations (so a `_bulk` response can be re-interleaved in M3).
5#[derive(Clone, PartialEq, Eq, Debug)]
6pub struct OpResult {
7    /// The document id the operation acted on (constructed or auto-assigned).
8    pub id: String,
9    /// The upstream HTTP status for this operation.
10    pub status: u16,
11    /// Whether the document was newly created (vs. updated).
12    pub created: bool,
13}
14
15impl OpResult {
16    /// Constructs an operation result.
17    #[must_use]
18    pub fn new(id: impl Into<String>, status: u16, created: bool) -> Self {
19        Self {
20            id: id.into(),
21            status,
22            created,
23        }
24    }
25
26    /// Whether the upstream status indicates success (2xx).
27    #[must_use]
28    pub fn is_success(&self) -> bool {
29        (200..300).contains(&self.status)
30    }
31}
32
33/// The acknowledgement for a whole batch: one [`OpResult`] per operation, in the
34/// batch's original order.
35#[derive(Clone, PartialEq, Eq, Debug, Default)]
36pub struct WriteAck {
37    results: Vec<OpResult>,
38    pool_reuse: bool,
39}
40
41impl WriteAck {
42    /// An ack with the given per-operation results.
43    #[must_use]
44    pub fn new(results: Vec<OpResult>) -> Self {
45        Self {
46            results,
47            pool_reuse: false,
48        }
49    }
50
51    /// Records whether the dispatch(es) rode reused pooled connections, true
52    /// only when every operation in the batch reused one (NFR-P telemetry).
53    #[must_use]
54    pub fn with_pool_reuse(mut self, reused: bool) -> Self {
55        self.pool_reuse = reused;
56        self
57    }
58
59    /// Whether this batch's dispatch rode reused pooled connection(s).
60    #[must_use]
61    pub fn pool_reuse(&self) -> bool {
62        self.pool_reuse
63    }
64
65    /// The per-operation results, in batch order.
66    #[must_use]
67    pub fn results(&self) -> &[OpResult] {
68        &self.results
69    }
70
71    /// Whether every operation in the batch succeeded.
72    #[must_use]
73    pub fn all_succeeded(&self) -> bool {
74        self.results.iter().all(OpResult::is_success)
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    #[test]
83    fn success_is_2xx() {
84        assert!(OpResult::new("a", 201, true).is_success());
85        assert!(OpResult::new("a", 200, false).is_success());
86        assert!(!OpResult::new("a", 409, false).is_success());
87    }
88
89    #[test]
90    fn ack_aggregates_success() {
91        let ack = WriteAck::new(vec![
92            OpResult::new("a", 201, true),
93            OpResult::new("b", 200, false),
94        ]);
95        assert!(ack.all_succeeded());
96        assert_eq!(ack.results().len(), 2);
97
98        let mixed = WriteAck::new(vec![
99            OpResult::new("a", 201, true),
100            OpResult::new("b", 503, false),
101        ]);
102        assert!(!mixed.all_succeeded());
103    }
104}