Skip to main content

drasi_core/interface/
outbox_writer.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Persistent outbox writer trait for reaction recovery.
16//!
17//! The outbox is a bounded ring buffer of serialized [`QueryResult`] entries,
18//! keyed by query ID and sequence number. Reactions read the outbox on restart
19//! to replay events missed while they were stopped.
20//!
21//! [`QueryResult`]: Used in the `lib` crate; this trait operates on serialized
22//! bytes so that storage backends (RocksDB, Garnet) don't depend on `drasi-lib`.
23//!
24//! ## Key semantics
25//!
26//! - **append**: Store a result at a given sequence. The outbox is bounded by
27//!   capacity; implementations may evict the oldest entries.
28//! - **read_from**: Return all entries with sequence > `after_sequence`, in order.
29//! - **trim_to_capacity**: Explicitly evict oldest entries beyond a limit.
30
31use async_trait::async_trait;
32
33use super::IndexError;
34
35/// Persistent outbox storage for query result replay.
36///
37/// Implementations store serialized `QueryResult` bytes keyed by `(query_id, sequence)`.
38/// The `lib` layer handles serialization/deserialization.
39#[async_trait]
40pub trait OutboxWriter: Send + Sync {
41    /// Append a serialized query result entry.
42    ///
43    /// If the outbox already contains an entry at this sequence, it is overwritten.
44    async fn append(&self, query_id: &str, sequence: u64, data: &[u8]) -> Result<(), IndexError>;
45
46    /// Read all entries with sequence strictly greater than `after_sequence`.
47    ///
48    /// Returns entries in ascending sequence order as `(sequence, data)` pairs.
49    /// Returns an empty vec if no entries exist after the given sequence.
50    async fn read_from(
51        &self,
52        query_id: &str,
53        after_sequence: u64,
54    ) -> Result<Vec<(u64, Vec<u8>)>, IndexError>;
55
56    /// Read the highest sequence number stored for this query.
57    ///
58    /// Returns `None` if the outbox is empty for this query.
59    async fn read_latest_sequence(&self, query_id: &str) -> Result<Option<u64>, IndexError>;
60
61    /// Delete all outbox entries for a query.
62    ///
63    /// Used during `AutoReset` recovery and reaction deprovisioning.
64    async fn clear(&self, query_id: &str) -> Result<(), IndexError>;
65
66    /// Trim the outbox to at most `capacity` entries, removing the oldest.
67    ///
68    /// Returns the number of entries removed. If the outbox has ≤ `capacity`
69    /// entries, this is a no-op returning 0.
70    async fn trim_to_capacity(&self, query_id: &str, capacity: usize) -> Result<usize, IndexError>;
71}