kitsune2_api/op_store.rs
1//! Kitsune2 op store types.
2
3use crate::{
4 builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
5};
6use bytes::Bytes;
7use futures::future::BoxFuture;
8use std::cmp::Ordering;
9use std::sync::Arc;
10
11pub(crate) mod proto {
12 include!("../proto/gen/kitsune2.op_store.rs");
13}
14
15pub use proto::Op;
16
17/// An op with metadata.
18///
19/// This is the basic unit of data in the kitsune2 system.
20#[derive(Debug, Clone, Eq, PartialEq)]
21pub struct MetaOp {
22 /// The id of the op.
23 pub op_id: OpId,
24
25 /// The actual op data.
26 pub op_data: Bytes,
27}
28
29impl From<Bytes> for Op {
30 fn from(value: Bytes) -> Self {
31 Self { data: value }
32 }
33}
34
35/// An op that has been stored by the Kitsune host.
36///
37/// This is the basic unit of data that the host is expected to store. Whether that storage is
38/// persistent may depend on the use-case. While Kitsune is holding references by hash, the host
39/// is expected to store the actual data.
40#[derive(Debug, Clone, Eq, PartialEq)]
41pub struct StoredOp {
42 /// The id of the op.
43 pub op_id: OpId,
44
45 /// The creation timestamp of the op.
46 ///
47 /// This must be the same for everyone who sees this op.
48 ///
49 /// Note that this means any op implementation must include a consistent timestamp in the op
50 /// data so that it can be provided back to Kitsune.
51 pub created_at: Timestamp,
52}
53
54impl Ord for StoredOp {
55 fn cmp(&self, other: &Self) -> Ordering {
56 (&self.created_at, &self.op_id).cmp(&(&other.created_at, &other.op_id))
57 }
58}
59
60impl PartialOrd for StoredOp {
61 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
62 Some(self.cmp(other))
63 }
64}
65
66/// The API that a kitsune2 host must implement to provide data persistence for kitsune2.
67#[cfg_attr(any(test, feature = "mockall"), mockall::automock)]
68pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
69 /// Process incoming ops.
70 ///
71 /// Pass the incoming ops to the host for processing. The host is expected to store the ops
72 /// if it is able to process them.
73 fn process_incoming_ops(
74 &self,
75 op_list: Vec<Bytes>,
76 ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
77
78 /// Retrieve a batch of ops from the host by time range.
79 ///
80 /// This must be the timestamp of the op, not the time that we saw the op or chose to store it.
81 /// The returned ops must be ordered by timestamp, ascending.
82 ///
83 /// # Returns
84 ///
85 /// - The op ids that are stored in the given arc and time range. Where the start time is
86 /// inclusive and the end time is exclusive.
87 /// - The total size of the op data that is pointed to by the returned op ids.
88 fn retrieve_op_hashes_in_time_slice(
89 &self,
90 arc: DhtArc,
91 start: Timestamp,
92 end: Timestamp,
93 ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32)>>;
94
95 /// Retrieve a list of ops by their op ids.
96 ///
97 /// This should be used to get op data for ops.
98 fn retrieve_ops(
99 &self,
100 op_ids: Vec<OpId>,
101 ) -> BoxFuture<'_, K2Result<Vec<MetaOp>>>;
102
103 /// Filter the passed in op ids by ops that exist in the store.
104 ///
105 /// # Returns
106 ///
107 /// The filtered list of op ids that do not exist in the store.
108 fn filter_out_existing_ops(
109 &self,
110 op_ids: Vec<OpId>,
111 ) -> BoxFuture<'_, K2Result<Vec<OpId>>>;
112
113 /// Retrieve a size-bounded list of op ids that have been stored within the given `arc` since
114 /// `start`.
115 ///
116 /// The `start` timestamp is used to retrieve ops by their `stored_at` timestamp rather than
117 /// their creation timestamp. This means that the `start` value can be used to page an op
118 /// store.
119 ///
120 /// The `limit_bytes` applies to the size of the op data, not the size of the op ids. This can
121 /// be thought of as a "page size" for the op data. Where the size is the size of the data
122 /// rather than the number of ops.
123 ///
124 /// If the limit is applied, then the timestamp of the last op id is returned.
125 /// Otherwise, the timestamp for when this operation started is returned.
126 /// Either way, the returned timestamp should be used as the `start` value for the next call
127 /// to this op store.
128 ///
129 /// # Returns
130 ///
131 /// - As many op ids as can be returned within the `limit_bytes` limit.
132 /// - The total size of the op data that is pointed to by the returned op ids.
133 /// - A new timestamp to be used for the next query.
134 fn retrieve_op_ids_bounded(
135 &self,
136 arc: DhtArc,
137 start: Timestamp,
138 limit_bytes: u32,
139 ) -> BoxFuture<'_, K2Result<(Vec<OpId>, u32, Timestamp)>>;
140
141 /// Get the earliest op timestamp in the given arc.
142 ///
143 /// If there are no ops in the arc, return None.
144 /// Otherwise, return the earliest `created_at` timestamp of the ops in the arc.
145 ///
146 /// This is used to determine the earliest timestamp that the arc has seen. That is a lower
147 /// bound on the work that needs to be done when building a summary model of the DHT.
148 fn earliest_timestamp_in_arc(
149 &self,
150 arc: DhtArc,
151 ) -> BoxFuture<'_, K2Result<Option<Timestamp>>>;
152
153 /// Store the combined hash of a time slice.
154 fn store_slice_hash(
155 &self,
156 arc: DhtArc,
157 slice_index: u64,
158 slice_hash: Bytes,
159 ) -> BoxFuture<'_, K2Result<()>>;
160
161 /// Count the number of stored time slices.
162 fn slice_hash_count(&self, arc: DhtArc) -> BoxFuture<'_, K2Result<u64>>;
163
164 /// Retrieve the combined hash of a time slice.
165 ///
166 /// If the slice is not found, return None.
167 /// If the time slice is present, then it must be identical to what was stored by Kitsune2
168 /// using [Self::store_slice_hash].
169 fn retrieve_slice_hash(
170 &self,
171 arc: DhtArc,
172 slice_index: u64,
173 ) -> BoxFuture<'_, K2Result<Option<Bytes>>>;
174
175 /// Retrieve all slice hashes for a given arc.
176 fn retrieve_slice_hashes(
177 &self,
178 arc: DhtArc,
179 ) -> BoxFuture<'_, K2Result<Vec<(u64, Bytes)>>>;
180}
181
182/// Trait-object [OpStore].
183pub type DynOpStore = Arc<dyn OpStore>;
184
185/// A factory for constructing [OpStore] instances.
186pub trait OpStoreFactory: 'static + Send + Sync + std::fmt::Debug {
187 /// Help the builder construct a default config from the chosen
188 /// module factories.
189 fn default_config(&self, config: &mut config::Config) -> K2Result<()>;
190
191 /// Validate configuration.
192 fn validate_config(&self, config: &config::Config) -> K2Result<()>;
193
194 /// Construct an op store instance.
195 fn create(
196 &self,
197 builder: Arc<builder::Builder>,
198 space: SpaceId,
199 ) -> BoxFut<'static, K2Result<DynOpStore>>;
200}
201
202/// Trait-object [OpStoreFactory].
203pub type DynOpStoreFactory = Arc<dyn OpStoreFactory>;
204
205#[cfg(test)]
206mod test {
207 use super::*;
208 use prost::Message;
209
210 #[test]
211 fn happy_meta_op_encode_decode() {
212 let op = Op::from(Bytes::from(vec![1; 128]));
213 let op_enc = op.encode_to_vec();
214 let op_dec = Op::decode(op_enc.as_slice()).unwrap();
215
216 assert_eq!(op_dec, op);
217 }
218}