kitsune2_api/
op_store.rs

1//! Kitsune2 op store types.
2
3use crate::{
4    builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
5};
6use bytes::Bytes;
7use futures::future::BoxFuture;
8#[cfg(feature = "mockall")]
9use mockall::automock;
10use std::cmp::Ordering;
11use std::sync::Arc;
12
13pub(crate) mod proto {
14    include!("../proto/gen/kitsune2.op_store.rs");
15}
16
17pub use proto::Op;
18
19/// An op with metadata.
20///
21/// This is the basic unit of data in the kitsune2 system.
22#[derive(Debug, Clone, Eq, PartialEq)]
23pub struct MetaOp {
24    /// The id of the op.
25    pub op_id: OpId,
26
27    /// The actual op data.
28    pub op_data: Bytes,
29}
30
31impl From<Bytes> for Op {
32    fn from(value: Bytes) -> Self {
33        Self { data: value }
34    }
35}
36
37/// An op that has been stored by the Kitsune host.
38///
39/// This is the basic unit of data that the host is expected to store. Whether that storage is
40/// persistent may depend on the use-case. While Kitsune is holding references by hash, the host
41/// is expected to store the actual data.
42#[derive(Debug, Clone, Eq, PartialEq)]
43pub struct StoredOp {
44    /// The id of the op.
45    pub op_id: OpId,
46
47    /// The creation timestamp of the op.
48    ///
49    /// This must be the same for everyone who sees this op.
50    ///
51    /// Note that this means any op implementation must include a consistent timestamp in the op
52    /// data so that it can be provided back to Kitsune.
53    pub created_at: Timestamp,
54}
55
56impl Ord for StoredOp {
57    fn cmp(&self, other: &Self) -> Ordering {
58        (&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
59    }
60}
61
62impl PartialOrd for StoredOp {
63    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
64        Some(self.cmp(other))
65    }
66}
67
68/// The API that a kitsune2 host must implement to provide data persistence for kitsune2.
69#[cfg_attr(any(test, feature = "mockall"), automock)]
70pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
71    /// Process incoming ops.
72    ///
73    /// Pass the incoming ops to the host for processing. The host is expected to store the ops
74    /// if it is able to process them.
75    fn process_incoming_ops(
76        &self,
77        op_list: Vec<Bytes>,
78    ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
79
80    /// Retrieve a batch of ops from the host by time range.
81    ///
82    /// This must be the timestamp of the op, not the time that we saw the op or chose to store it.
83    /// The returned ops must be ordered by timestamp, ascending.
84    ///
85    /// # Returns
86    ///
87    /// - As many op ids as can be returned within the `limit_bytes` limit, within the arc and time
88    ///   bounds.
89    /// - The total size of the op data that is pointed to by the returned op ids.
90    fn retrieve_op_hashes_in_time_slice(
91        &self,
92        arc: DhtArc,
93        start: Timestamp,
94        end: Timestamp,
95    ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32)>>;
96
97    /// Retrieve a list of ops by their op ids.
98    ///
99    /// This should be used to get op data for ops.
100    fn retrieve_ops(
101        &self,
102        op_ids: Vec<OpId>,
103    ) -> BoxFuture<'_, K2Result<Vec<MetaOp>>>;
104
105    /// Filter the passed in op ids by ops that exist in the store.
106    ///
107    /// # Returns
108    ///
109    /// The filtered list of op ids that do not exist in the store.
110    fn filter_out_existing_ops(
111        &self,
112        op_ids: Vec<OpId>,
113    ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
114
115    /// Retrieve a size-bounded list of op ids that have been stored within the given `arc` since
116    /// `start`.
117    ///
118    /// The `start` timestamp is used to retrieve ops by their `stored_at` timestamp rather than
119    /// their creation timestamp. This means that the `start` value can be used to page an op
120    /// store.
121    ///
122    /// The `limit_bytes` applies to the size of the op data, not the size of the op ids. This can
123    /// be thought of as a "page size" for the op data. Where the size is the size of the data
124    /// rather than the number of ops.
125    ///
126    /// If the limit is applied, then the timestamp of the last op id is returned.
127    /// Otherwise, the timestamp for when this operation started is returned.
128    /// Either way, the returned timestamp should be used as the `start` value for the next call
129    /// to this op store.
130    ///
131    /// # Returns
132    ///
133    /// - As many op ids as can be returned within the `limit_bytes` limit.
134    /// - The total size of the op data that is pointed to by the returned op ids.
135    /// - A new timestamp to be used for the next query.
136    fn retrieve_op_ids_bounded(
137        &self,
138        arc: DhtArc,
139        start: Timestamp,
140        limit_bytes: u32,
141    ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32, Timestamp)>>;
142
143    /// Get the earliest op timestamp in the given arc.
144    ///
145    /// If there are no ops in the arc, return None.
146    /// Otherwise, return the earliest `created_at` timestamp of the ops in the arc.
147    ///
148    /// This is used to determine the earliest timestamp that the arc has seen. That is a lower
149    /// bound on the work that needs to be done when building a summary model of the DHT.
150    fn earliest_timestamp_in_arc(
151        &self,
152        arc: DhtArc,
153    ) -> BoxFuture<'_, K2Result<Option<Timestamp>>>;
154
155    /// Store the combined hash of a time slice.
156    fn store_slice_hash(
157        &self,
158        arc: DhtArc,
159        slice_index: u64,
160        slice_hash: Bytes,
161    ) -> BoxFuture<'_, K2Result<()>>;
162
163    /// Count the number of stored time slices.
164    fn slice_hash_count(&self, arc: DhtArc) -> BoxFuture<'_, K2Result<u64>>;
165
166    /// Retrieve the combined hash of a time slice.
167    ///
168    /// If the slice is not found, return None.
169    /// If the time slice is present, then it must be identical to what was stored by Kitsune2
170    /// using [Self::store_slice_hash].
171    fn retrieve_slice_hash(
172        &self,
173        arc: DhtArc,
174        slice_index: u64,
175    ) -> BoxFuture<'_, K2Result<Option<Bytes>>>;
176
177    /// Retrieve all slice hashes for a given arc.
178    fn retrieve_slice_hashes(
179        &self,
180        arc: DhtArc,
181    ) -> BoxFuture<'_, K2Result<Vec<(u64, Bytes)>>>;
182}
183
184/// Trait-object [OpStore].
185pub type DynOpStore = Arc<dyn OpStore>;
186
187/// A factory for constructing [OpStore] instances.
188pub trait OpStoreFactory: 'static + Send + Sync + std::fmt::Debug {
189    /// Help the builder construct a default config from the chosen
190    /// module factories.
191    fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
192
193    /// Validate configuration.
194    fn validate_config(&self, config: &config::Config) -> K2Result<()>;
195
196    /// Construct an op store instance.
197    fn create(
198        &self,
199        builder: Arc<builder::Builder>,
200        space: SpaceId,
201    ) -> BoxFut<'static, K2Result<DynOpStore>>;
202}
203
204/// Trait-object [OpStoreFactory].
205pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;
206
207#[cfg(test)]
208mod test {
209    use super::*;
210    use prost::Message;
211
212    #[test]
213    fn happy_meta_op_encode_decode() {
214        let op = Op::from(Bytes::from(vec![1; 128]));
215        let op_enc = op.encode_to_vec();
216        let op_dec = Op::decode(op_enc.as_slice()).unwrap();
217
218        assert_eq!(op_dec, op);
219    }
220}