Skip to main content

soil_statement_store/
store_api.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: Apache-2.0 OR GPL-3.0-or-later WITH Classpath-exception-2.0
6
7pub use crate::runtime_api::StatementSource;
8use crate::{Hash, Statement, Topic, MAX_ANY_TOPICS, MAX_TOPICS};
9use std::collections::HashSet;
10use subsoil::core::{bounded_vec::BoundedVec, Bytes, ConstU32};
11
12/// Statement store error.
13#[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)]
14#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
15pub enum Error {
16	/// Database error.
17	#[error("Database error: {0:?}")]
18	Db(String),
19	/// Decoding error
20	#[error("Decoding error: {0:?}")]
21	Decode(String),
22	/// Error reading from storage.
23	#[error("Storage error: {0:?}")]
24	Storage(String),
25}
26
27/// Filter for subscribing to statements with different topics.
28#[derive(Debug, Clone)]
29#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
30#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
31pub enum TopicFilter {
32	/// Matches all topics.
33	Any,
34	/// Matches only statements including all of the given topics.
35	/// Bytes are expected to be a 32-byte topic. Up to [`MAX_TOPICS`] topics can be provided.
36	MatchAll(BoundedVec<Topic, ConstU32<{ MAX_TOPICS as u32 }>>),
37	/// Matches statements including any of the given topics.
38	/// Bytes are expected to be a 32-byte topic. Up to [`MAX_ANY_TOPICS`] topics can be provided.
39	MatchAny(BoundedVec<Topic, ConstU32<{ MAX_ANY_TOPICS as u32 }>>),
40}
41
42/// Topic filter for statement subscriptions, optimized for matching.
43#[derive(Clone, Debug)]
44pub enum OptimizedTopicFilter {
45	/// Matches all topics.
46	Any,
47	/// Matches only statements including all of the given topics.
48	/// Up to `4` topics can be provided.
49	MatchAll(HashSet<Topic>),
50	/// Matches statements including any of the given topics.
51	/// Up to `128` topics can be provided.
52	MatchAny(HashSet<Topic>),
53}
54
55impl OptimizedTopicFilter {
56	/// Check if the statement matches the filter.
57	pub fn matches(&self, statement: &Statement) -> bool {
58		match self {
59			OptimizedTopicFilter::Any => true,
60			OptimizedTopicFilter::MatchAll(topics) => {
61				statement.topics().iter().filter(|topic| topics.contains(*topic)).count()
62					== topics.len()
63			},
64			OptimizedTopicFilter::MatchAny(topics) => {
65				statement.topics().iter().any(|topic| topics.contains(topic))
66			},
67		}
68	}
69}
70
71// Convert TopicFilter to CheckedTopicFilter.
72impl From<TopicFilter> for OptimizedTopicFilter {
73	fn from(filter: TopicFilter) -> Self {
74		match filter {
75			TopicFilter::Any => OptimizedTopicFilter::Any,
76			TopicFilter::MatchAll(topics) => {
77				let mut parsed_topics = HashSet::with_capacity(topics.len());
78				for topic in topics {
79					parsed_topics.insert(topic);
80				}
81				OptimizedTopicFilter::MatchAll(parsed_topics)
82			},
83			TopicFilter::MatchAny(topics) => {
84				let mut parsed_topics = HashSet::with_capacity(topics.len());
85				for topic in topics {
86					parsed_topics.insert(topic);
87				}
88				OptimizedTopicFilter::MatchAny(parsed_topics)
89			},
90		}
91	}
92}
93
94/// Reason why a statement was rejected from the store.
95#[derive(Debug, Clone, Eq, PartialEq)]
96#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
97#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
98pub enum RejectionReason {
99	/// Statement data exceeds the maximum allowed size for the account.
100	DataTooLarge {
101		/// The size of the submitted statement data.
102		submitted_size: usize,
103		/// Still available data size for the account.
104		available_size: usize,
105	},
106	/// Attempting to replace a channel message with lower or equal expiry.
107	ChannelPriorityTooLow {
108		/// The expiry of the submitted statement.
109		submitted_expiry: u64,
110		/// The minimum expiry of the existing channel message.
111		min_expiry: u64,
112	},
113	/// Account reached its statement limit and submitted expiry is too low to evict existing.
114	AccountFull {
115		/// The expiry of the submitted statement.
116		submitted_expiry: u64,
117		/// The minimum expiry of the existing statement.
118		min_expiry: u64,
119	},
120	/// The global statement store is full and cannot accept new statements.
121	StoreFull,
122	/// Account has no allowance set.
123	NoAllowance,
124}
125
126impl RejectionReason {
127	/// Returns a short string label suitable for use in metrics.
128	pub fn label(&self) -> &'static str {
129		match self {
130			RejectionReason::DataTooLarge { .. } => "data_too_large",
131			RejectionReason::ChannelPriorityTooLow { .. } => "channel_priority_too_low",
132			RejectionReason::AccountFull { .. } => "account_full",
133			RejectionReason::StoreFull => "store_full",
134			RejectionReason::NoAllowance => "no_allowance",
135		}
136	}
137}
138
139/// Reason why a statement failed validation.
140#[derive(Debug, Clone, Eq, PartialEq)]
141#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
142#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
143pub enum InvalidReason {
144	/// Statement has no proof.
145	NoProof,
146	/// Proof validation failed.
147	BadProof,
148	/// Statement exceeds max allowed statement size.
149	EncodingTooLarge {
150		/// The size of the submitted statement encoding.
151		submitted_size: usize,
152		/// The maximum allowed size.
153		max_size: usize,
154	},
155	/// Statement has already expired. The expiry field is in the past.
156	AlreadyExpired,
157}
158
159/// Statement submission outcome
160#[derive(Debug, Clone, Eq, PartialEq)]
161#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
162#[cfg_attr(feature = "serde", serde(tag = "status", rename_all = "camelCase"))]
163pub enum SubmitResult {
164	/// Statement was accepted as new.
165	New,
166	/// Statement was already known.
167	Known,
168	/// Statement was already known but has expired.
169	KnownExpired,
170	/// Statement was rejected because the store is full or priority is too low.
171	Rejected(RejectionReason),
172	/// Statement failed validation.
173	Invalid(InvalidReason),
174	/// Internal store error.
175	InternalError(Error),
176}
177
178/// An item returned by the statement subscription stream.
179#[derive(Debug, Clone, Eq, PartialEq)]
180#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
181#[cfg_attr(feature = "serde", serde(tag = "event", content = "data", rename_all = "camelCase"))]
182pub enum StatementEvent {
183	/// A batch of statements matching the subscription filter.
184	NewStatements {
185		/// A batch of statements matching the subscription filter, each entry is a SCALE-encoded
186		/// statement.
187		statements: Vec<Bytes>,
188		/// An optional count of how many more matching statements are in the store after this
189		/// batch. This guarantees to the client that it will receive at least this many more
190		/// statements in the subscription stream, but it may receive more if new statements are
191		/// added to the store that match the filter.
192		#[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
193		remaining: Option<u32>,
194	},
195}
196
197/// Result type for `Error`
198pub type Result<T> = std::result::Result<T, Error>;
199
200/// Decision returned by the filter used in [`StatementStore::statements_by_hashes`].
201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202pub enum FilterDecision {
203	/// Skip this statement, continue to next.
204	Skip,
205	/// Take this statement, continue to next.
206	Take,
207	/// Stop iteration, return collected statements.
208	Abort,
209}
210
211/// Statement store API.
212pub trait StatementStore: Send + Sync {
213	/// Return all statements.
214	fn statements(&self) -> Result<Vec<(Hash, Statement)>>;
215
216	/// Return recent statements and clear the internal index.
217	///
218	/// This consumes and clears the recently received statements,
219	/// allowing new statements to be collected from this point forward.
220	fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>>;
221
222	/// Get statement by hash.
223	fn statement(&self, hash: &Hash) -> Result<Option<Statement>>;
224
225	/// Check if statement exists in the store
226	///
227	/// Fast index check without accessing the DB.
228	fn has_statement(&self, hash: &Hash) -> bool;
229
230	/// Return all statement hashes.
231	fn statement_hashes(&self) -> Vec<Hash>;
232
233	/// Fetch statements by their hashes with a filter callback.
234	///
235	/// The callback receives (hash, encoded_bytes, decoded_statement) and returns:
236	/// - `Skip`: ignore this statement, continue to next
237	/// - `Take`: include this statement in the result, continue to next
238	/// - `Abort`: stop iteration, return collected statements so far
239	///
240	/// Returns (statements, number_of_hashes_processed).
241	fn statements_by_hashes(
242		&self,
243		hashes: &[Hash],
244		filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
245	) -> Result<(Vec<(Hash, Statement)>, usize)>;
246
247	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
248	/// field.
249	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
250
251	/// Return the data of all known statements whose decryption key is identified as `dest` (this
252	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
253	/// private key for symmetric ciphers).
254	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
255
256	/// Return the decrypted data of all known statements whose decryption key is identified as
257	/// `dest`. The key must be available to the client.
258	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
259
260	/// Return all known statements which include all topics and have no `DecryptionKey`
261	/// field.
262	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
263
264	/// Return all known statements whose decryption key is identified as `dest` (this
265	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
266	/// private key for symmetric ciphers).
267	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
268
269	/// Return the statement and the decrypted data of all known statements whose decryption key is
270	/// identified as `dest`. The key must be available to the client.
271	///
272	/// The result is for each statement: the SCALE-encoded statement concatenated to the
273	/// decrypted data.
274	fn posted_clear_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32])
275		-> Result<Vec<Vec<u8>>>;
276
277	/// Submit a statement.
278	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult;
279
280	/// Remove a statement from the store.
281	fn remove(&self, hash: &Hash) -> Result<()>;
282
283	/// Remove all statements authored by `who`.
284	fn remove_by(&self, who: [u8; 32]) -> Result<()>;
285}