kitsune2_api/
op_store.rs

1//! Kitsune2 op store types.
2
3use crate::{
4    builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
5};
6use bytes::Bytes;
7use futures::future::BoxFuture;
8use std::cmp::Ordering;
9use std::sync::Arc;
10
11pub(crate) mod proto {
12    include!("../proto/gen/kitsune2.op_store.rs");
13}
14
15pub use proto::Op;
16
17/// An op with metadata.
18///
19/// This is the basic unit of data in the kitsune2 system.
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct MetaOp {
22    /// The id of the op.
23    pub op_id: OpId,
24
25    /// The actual op data.
26    pub op_data: Bytes,
27}
28
29impl From<Bytes> for Op {
30    fn from(value: Bytes) -> Self {
31        Self { data: value }
32    }
33}
34
35/// An op that has been stored by the Kitsune host.
36///
37/// This is the basic unit of data that the host is expected to store. Whether that storage is
38/// persistent may depend on the use-case. While Kitsune is holding references by hash, the host
39/// is expected to store the actual data.
40#[derive(Debug, Clone, Eq, PartialEq)]
41pub struct StoredOp {
42    /// The id of the op.
43    pub op_id: OpId,
44
45    /// The creation timestamp of the op.
46    ///
47    /// This must be the same for everyone who sees this op.
48    ///
49    /// Note that this means any op implementation must include a consistent timestamp in the op
50    /// data so that it can be provided back to Kitsune.
51    pub created_at: Timestamp,
52}
53
54impl Ord for StoredOp {
55    fn cmp(&self, other: &Self) -> Ordering {
56        (&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
57    }
58}
59
60impl PartialOrd for StoredOp {
61    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
62        Some(self.cmp(other))
63    }
64}
65
66/// The API that a kitsune2 host must implement to provide data persistence for kitsune2.
67#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
68pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
69    /// Process incoming ops.
70    ///
71    /// Pass the incoming ops to the host for processing. The host is expected to store the ops
72    /// if it is able to process them.
73    fn process_incoming_ops(
74        &self,
75        op_list: Vec<Bytes>,
76    ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
77
78    /// Retrieve a batch of ops from the host by time range.
79    ///
80    /// This must be the timestamp of the op, not the time that we saw the op or chose to store it.
81    /// The returned ops must be ordered by timestamp, ascending.
82    ///
83    /// # Returns
84    ///
85    /// - The op ids that are stored in the given arc and time range. Where the start time is
86    ///   inclusive and the end time is exclusive.
87    /// - The total size of the op data that is pointed to by the returned op ids.
88    fn retrieve_op_hashes_in_time_slice(
89        &self,
90        arc: DhtArc,
91        start: Timestamp,
92        end: Timestamp,
93    ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32)>>;
94
95    /// Retrieve a list of ops by their op ids.
96    ///
97    /// This should be used to get op data for ops.
98    fn retrieve_ops(
99        &self,
100        op_ids: Vec<OpId>,
101    ) -> BoxFuture<'_, K2Result<Vec<MetaOp>>>;
102
103    /// Filter the passed in op ids by ops that exist in the store.
104    ///
105    /// # Returns
106    ///
107    /// The filtered list of op ids that do not exist in the store.
108    fn filter_out_existing_ops(
109        &self,
110        op_ids: Vec<OpId>,
111    ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
112
113    /// Retrieve a size-bounded list of op ids that have been stored within the given `arc` since
114    /// `start`.
115    ///
116    /// The `start` timestamp is used to retrieve ops by their `stored_at` timestamp rather than
117    /// their creation timestamp. This means that the `start` value can be used to page an op
118    /// store.
119    ///
120    /// The `limit_bytes` applies to the size of the op data, not the size of the op ids. This can
121    /// be thought of as a "page size" for the op data. Where the size is the size of the data
122    /// rather than the number of ops.
123    ///
124    /// If the limit is applied, then the timestamp of the last op id is returned.
125    /// Otherwise, the timestamp for when this operation started is returned.
126    /// Either way, the returned timestamp should be used as the `start` value for the next call
127    /// to this op store.
128    ///
129    /// # Returns
130    ///
131    /// - As many op ids as can be returned within the `limit_bytes` limit.
132    /// - The total size of the op data that is pointed to by the returned op ids.
133    /// - A new timestamp to be used for the next query.
134    fn retrieve_op_ids_bounded(
135        &self,
136        arc: DhtArc,
137        start: Timestamp,
138        limit_bytes: u32,
139    ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32, Timestamp)>>;
140
141    /// Get the earliest op timestamp in the given arc.
142    ///
143    /// If there are no ops in the arc, return None.
144    /// Otherwise, return the earliest `created_at` timestamp of the ops in the arc.
145    ///
146    /// This is used to determine the earliest timestamp that the arc has seen. That is a lower
147    /// bound on the work that needs to be done when building a summary model of the DHT.
148    fn earliest_timestamp_in_arc(
149        &self,
150        arc: DhtArc,
151    ) -> BoxFuture<'_, K2Result<Option<Timestamp>>>;
152
153    /// Store the combined hash of a time slice.
154    fn store_slice_hash(
155        &self,
156        arc: DhtArc,
157        slice_index: u64,
158        slice_hash: Bytes,
159    ) -> BoxFuture<'_, K2Result<()>>;
160
161    /// Count the number of stored time slices.
162    fn slice_hash_count(&self, arc: DhtArc) -> BoxFuture<'_, K2Result<u64>>;
163
164    /// Retrieve the combined hash of a time slice.
165    ///
166    /// If the slice is not found, return None.
167    /// If the time slice is present, then it must be identical to what was stored by Kitsune2
168    /// using [Self::store_slice_hash].
169    fn retrieve_slice_hash(
170        &self,
171        arc: DhtArc,
172        slice_index: u64,
173    ) -> BoxFuture<'_, K2Result<Option<Bytes>>>;
174
175    /// Retrieve all slice hashes for a given arc.
176    fn retrieve_slice_hashes(
177        &self,
178        arc: DhtArc,
179    ) -> BoxFuture<'_, K2Result<Vec<(u64, Bytes)>>>;
180}
181
182/// Trait-object [OpStore].
183pub type DynOpStore = Arc<dyn OpStore>;
184
185/// A factory for constructing [OpStore] instances.
186pub trait OpStoreFactory: 'static + Send + Sync + std::fmt::Debug {
187    /// Help the builder construct a default config from the chosen
188    /// module factories.
189    fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
190
191    /// Validate configuration.
192    fn validate_config(&self, config: &config::Config) -> K2Result<()>;
193
194    /// Construct an op store instance.
195    fn create(
196        &self,
197        builder: Arc<builder::Builder>,
198        space: SpaceId,
199    ) -> BoxFut<'static, K2Result<DynOpStore>>;
200}
201
202/// Trait-object [OpStoreFactory].
203pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;
204
205#[cfg(test)]
206mod test {
207    use super::*;
208    use prost::Message;
209
210    #[test]
211    fn happy_meta_op_encode_decode() {
212        let op = Op::from(Bytes::from(vec![1; 128]));
213        let op_enc = op.encode_to_vec();
214        let op_dec = Op::decode(op_enc.as_slice()).unwrap();
215
216        assert_eq!(op_dec, op);
217    }
218}