kitsune2_api/op_store.rs
1//! Kitsune2 op store types.
2
3use crate::{
4 builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
5};
6use bytes::Bytes;
7use futures::future::BoxFuture;
8#[cfg(feature = "mockall")]
9use mockall::automock;
10use std::cmp::Ordering;
11use std::sync::Arc;
12
13pub(crate) mod proto {
14 include!("../proto/gen/kitsune2.op_store.rs");
15}
16
17pub use proto::Op;
18
19/// An op with metadata.
20///
21/// This is the basic unit of data in the kitsune2 system.
22#[derive(Debug, Clone, Eq, PartialEq)]
23pub struct MetaOp {
24 /// The id of the op.
25 pub op_id: OpId,
26
27 /// The actual op data.
28 pub op_data: Bytes,
29}
30
31impl From<Bytes> for Op {
32 fn from(value: Bytes) -> Self {
33 Self { data: value }
34 }
35}
36
37/// An op that has been stored by the Kitsune host.
38///
39/// This is the basic unit of data that the host is expected to store. Whether that storage is
40/// persistent may depend on the use-case. While Kitsune is holding references by hash, the host
41/// is expected to store the actual data.
42#[derive(Debug, Clone, Eq, PartialEq)]
43pub struct StoredOp {
44 /// The id of the op.
45 pub op_id: OpId,
46
47 /// The creation timestamp of the op.
48 ///
49 /// This must be the same for everyone who sees this op.
50 ///
51 /// Note that this means any op implementation must include a consistent timestamp in the op
52 /// data so that it can be provided back to Kitsune.
53 pub created_at: Timestamp,
54}
55
56impl Ord for StoredOp {
57 fn cmp(&self, other: &Self) -> Ordering {
58 (&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
59 }
60}
61
62impl PartialOrd for StoredOp {
63 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
64 Some(self.cmp(other))
65 }
66}
67
68/// The API that a kitsune2 host must implement to provide data persistence for kitsune2.
69#[cfg_attr(any(test, feature = "mockall"), automock)]
70pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
71 /// Process incoming ops.
72 ///
73 /// Pass the incoming ops to the host for processing. The host is expected to store the ops
74 /// if it is able to process them.
75 fn process_incoming_ops(
76 &self,
77 op_list: Vec<Bytes>,
78 ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
79
80 /// Retrieve a batch of ops from the host by time range.
81 ///
82 /// This must be the timestamp of the op, not the time that we saw the op or chose to store it.
83 /// The returned ops must be ordered by timestamp, ascending.
84 ///
85 /// # Returns
86 ///
87 /// - As many op ids as can be returned within the `limit_bytes` limit, within the arc and time
88 /// bounds.
89 /// - The total size of the op data that is pointed to by the returned op ids.
90 fn retrieve_op_hashes_in_time_slice(
91 &self,
92 arc: DhtArc,
93 start: Timestamp,
94 end: Timestamp,
95 ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32)>>;
96
97 /// Retrieve a list of ops by their op ids.
98 ///
99 /// This should be used to get op data for ops.
100 fn retrieve_ops(
101 &self,
102 op_ids: Vec<OpId>,
103 ) -> BoxFuture<'_, K2Result<Vec<MetaOp>>>;
104
105 /// Filter the passed in op ids by ops that exist in the store.
106 ///
107 /// # Returns
108 ///
109 /// The filtered list of op ids that do not exist in the store.
110 fn filter_out_existing_ops(
111 &self,
112 op_ids: Vec<OpId>,
113 ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
114
115 /// Retrieve a size-bounded list of op ids that have been stored within the given `arc` since
116 /// `start`.
117 ///
118 /// The `start` timestamp is used to retrieve ops by their `stored_at` timestamp rather than
119 /// their creation timestamp. This means that the `start` value can be used to page an op
120 /// store.
121 ///
122 /// The `limit_bytes` applies to the size of the op data, not the size of the op ids. This can
123 /// be thought of as a "page size" for the op data. Where the size is the size of the data
124 /// rather than the number of ops.
125 ///
126 /// If the limit is applied, then the timestamp of the last op id is returned.
127 /// Otherwise, the timestamp for when this operation started is returned.
128 /// Either way, the returned timestamp should be used as the `start` value for the next call
129 /// to this op store.
130 ///
131 /// # Returns
132 ///
133 /// - As many op ids as can be returned within the `limit_bytes` limit.
134 /// - The total size of the op data that is pointed to by the returned op ids.
135 /// - A new timestamp to be used for the next query.
136 fn retrieve_op_ids_bounded(
137 &self,
138 arc: DhtArc,
139 start: Timestamp,
140 limit_bytes: u32,
141 ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32, Timestamp)>>;
142
143 /// Get the earliest op timestamp in the given arc.
144 ///
145 /// If there are no ops in the arc, return None.
146 /// Otherwise, return the earliest `created_at` timestamp of the ops in the arc.
147 ///
148 /// This is used to determine the earliest timestamp that the arc has seen. That is a lower
149 /// bound on the work that needs to be done when building a summary model of the DHT.
150 fn earliest_timestamp_in_arc(
151 &self,
152 arc: DhtArc,
153 ) -> BoxFuture<'_, K2Result<Option<Timestamp>>>;
154
155 /// Store the combined hash of a time slice.
156 fn store_slice_hash(
157 &self,
158 arc: DhtArc,
159 slice_index: u64,
160 slice_hash: Bytes,
161 ) -> BoxFuture<'_, K2Result<()>>;
162
163 /// Count the number of stored time slices.
164 fn slice_hash_count(&self, arc: DhtArc) -> BoxFuture<'_, K2Result<u64>>;
165
166 /// Retrieve the combined hash of a time slice.
167 ///
168 /// If the slice is not found, return None.
169 /// If the time slice is present, then it must be identical to what was stored by Kitsune2
170 /// using [Self::store_slice_hash].
171 fn retrieve_slice_hash(
172 &self,
173 arc: DhtArc,
174 slice_index: u64,
175 ) -> BoxFuture<'_, K2Result<Option<Bytes>>>;
176
177 /// Retrieve all slice hashes for a given arc.
178 fn retrieve_slice_hashes(
179 &self,
180 arc: DhtArc,
181 ) -> BoxFuture<'_, K2Result<Vec<(u64, Bytes)>>>;
182}
183
184/// Trait-object [OpStore].
185pub type DynOpStore = Arc<dyn OpStore>;
186
187/// A factory for constructing [OpStore] instances.
188pub trait OpStoreFactory: 'static + Send + Sync + std::fmt::Debug {
189 /// Help the builder construct a default config from the chosen
190 /// module factories.
191 fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
192
193 /// Validate configuration.
194 fn validate_config(&self, config: &config::Config) -> K2Result<()>;
195
196 /// Construct an op store instance.
197 fn create(
198 &self,
199 builder: Arc<builder::Builder>,
200 space: SpaceId,
201 ) -> BoxFut<'static, K2Result<DynOpStore>>;
202}
203
204/// Trait-object [OpStoreFactory].
205pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;
206
207#[cfg(test)]
208mod test {
209 use super::*;
210 use prost::Message;
211
212 #[test]
213 fn happy_meta_op_encode_decode() {
214 let op = Op::from(Bytes::from(vec![1; 128]));
215 let op_enc = op.encode_to_vec();
216 let op_dec = Op::decode(op_enc.as_slice()).unwrap();
217
218 assert_eq!(op_dec, op);
219 }
220}