soil_statement_store/store_api.rs
1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: Apache-2.0 OR GPL-3.0-or-later WITH Classpath-exception-2.0
6
7pub use crate::runtime_api::StatementSource;
8use crate::{Hash, Statement, Topic, MAX_ANY_TOPICS, MAX_TOPICS};
9use std::collections::HashSet;
10use subsoil::core::{bounded_vec::BoundedVec, Bytes, ConstU32};
11
12/// Statement store error.
13#[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)]
14#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
15pub enum Error {
16 /// Database error.
17 #[error("Database error: {0:?}")]
18 Db(String),
19 /// Decoding error
20 #[error("Decoding error: {0:?}")]
21 Decode(String),
22 /// Error reading from storage.
23 #[error("Storage error: {0:?}")]
24 Storage(String),
25}
26
27/// Filter for subscribing to statements with different topics.
28#[derive(Debug, Clone)]
29#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
30#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
31pub enum TopicFilter {
32 /// Matches all topics.
33 Any,
34 /// Matches only statements including all of the given topics.
35 /// Bytes are expected to be a 32-byte topic. Up to [`MAX_TOPICS`] topics can be provided.
36 MatchAll(BoundedVec<Topic, ConstU32<{ MAX_TOPICS as u32 }>>),
37 /// Matches statements including any of the given topics.
38 /// Bytes are expected to be a 32-byte topic. Up to [`MAX_ANY_TOPICS`] topics can be provided.
39 MatchAny(BoundedVec<Topic, ConstU32<{ MAX_ANY_TOPICS as u32 }>>),
40}
41
42/// Topic filter for statement subscriptions, optimized for matching.
43#[derive(Clone, Debug)]
44pub enum OptimizedTopicFilter {
45 /// Matches all topics.
46 Any,
47 /// Matches only statements including all of the given topics.
48 /// Up to `4` topics can be provided.
49 MatchAll(HashSet<Topic>),
50 /// Matches statements including any of the given topics.
51 /// Up to `128` topics can be provided.
52 MatchAny(HashSet<Topic>),
53}
54
55impl OptimizedTopicFilter {
56 /// Check if the statement matches the filter.
57 pub fn matches(&self, statement: &Statement) -> bool {
58 match self {
59 OptimizedTopicFilter::Any => true,
60 OptimizedTopicFilter::MatchAll(topics) => {
61 statement.topics().iter().filter(|topic| topics.contains(*topic)).count()
62 == topics.len()
63 },
64 OptimizedTopicFilter::MatchAny(topics) => {
65 statement.topics().iter().any(|topic| topics.contains(topic))
66 },
67 }
68 }
69}
70
71// Convert TopicFilter to CheckedTopicFilter.
72impl From<TopicFilter> for OptimizedTopicFilter {
73 fn from(filter: TopicFilter) -> Self {
74 match filter {
75 TopicFilter::Any => OptimizedTopicFilter::Any,
76 TopicFilter::MatchAll(topics) => {
77 let mut parsed_topics = HashSet::with_capacity(topics.len());
78 for topic in topics {
79 parsed_topics.insert(topic);
80 }
81 OptimizedTopicFilter::MatchAll(parsed_topics)
82 },
83 TopicFilter::MatchAny(topics) => {
84 let mut parsed_topics = HashSet::with_capacity(topics.len());
85 for topic in topics {
86 parsed_topics.insert(topic);
87 }
88 OptimizedTopicFilter::MatchAny(parsed_topics)
89 },
90 }
91 }
92}
93
94/// Reason why a statement was rejected from the store.
95#[derive(Debug, Clone, Eq, PartialEq)]
96#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
97#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
98pub enum RejectionReason {
99 /// Statement data exceeds the maximum allowed size for the account.
100 DataTooLarge {
101 /// The size of the submitted statement data.
102 submitted_size: usize,
103 /// Still available data size for the account.
104 available_size: usize,
105 },
106 /// Attempting to replace a channel message with lower or equal expiry.
107 ChannelPriorityTooLow {
108 /// The expiry of the submitted statement.
109 submitted_expiry: u64,
110 /// The minimum expiry of the existing channel message.
111 min_expiry: u64,
112 },
113 /// Account reached its statement limit and submitted expiry is too low to evict existing.
114 AccountFull {
115 /// The expiry of the submitted statement.
116 submitted_expiry: u64,
117 /// The minimum expiry of the existing statement.
118 min_expiry: u64,
119 },
120 /// The global statement store is full and cannot accept new statements.
121 StoreFull,
122 /// Account has no allowance set.
123 NoAllowance,
124}
125
126impl RejectionReason {
127 /// Returns a short string label suitable for use in metrics.
128 pub fn label(&self) -> &'static str {
129 match self {
130 RejectionReason::DataTooLarge { .. } => "data_too_large",
131 RejectionReason::ChannelPriorityTooLow { .. } => "channel_priority_too_low",
132 RejectionReason::AccountFull { .. } => "account_full",
133 RejectionReason::StoreFull => "store_full",
134 RejectionReason::NoAllowance => "no_allowance",
135 }
136 }
137}
138
139/// Reason why a statement failed validation.
140#[derive(Debug, Clone, Eq, PartialEq)]
141#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
142#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
143pub enum InvalidReason {
144 /// Statement has no proof.
145 NoProof,
146 /// Proof validation failed.
147 BadProof,
148 /// Statement exceeds max allowed statement size.
149 EncodingTooLarge {
150 /// The size of the submitted statement encoding.
151 submitted_size: usize,
152 /// The maximum allowed size.
153 max_size: usize,
154 },
155 /// Statement has already expired. The expiry field is in the past.
156 AlreadyExpired,
157}
158
159/// Statement submission outcome
160#[derive(Debug, Clone, Eq, PartialEq)]
161#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
162#[cfg_attr(feature = "serde", serde(tag = "status", rename_all = "camelCase"))]
163pub enum SubmitResult {
164 /// Statement was accepted as new.
165 New,
166 /// Statement was already known.
167 Known,
168 /// Statement was already known but has expired.
169 KnownExpired,
170 /// Statement was rejected because the store is full or priority is too low.
171 Rejected(RejectionReason),
172 /// Statement failed validation.
173 Invalid(InvalidReason),
174 /// Internal store error.
175 InternalError(Error),
176}
177
178/// An item returned by the statement subscription stream.
179#[derive(Debug, Clone, Eq, PartialEq)]
180#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
181#[cfg_attr(feature = "serde", serde(tag = "event", content = "data", rename_all = "camelCase"))]
182pub enum StatementEvent {
183 /// A batch of statements matching the subscription filter.
184 NewStatements {
185 /// A batch of statements matching the subscription filter, each entry is a SCALE-encoded
186 /// statement.
187 statements: Vec<Bytes>,
188 /// An optional count of how many more matching statements are in the store after this
189 /// batch. This guarantees to the client that it will receive at least this many more
190 /// statements in the subscription stream, but it may receive more if new statements are
191 /// added to the store that match the filter.
192 #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
193 remaining: Option<u32>,
194 },
195}
196
197/// Result type for `Error`
198pub type Result<T> = std::result::Result<T, Error>;
199
200/// Decision returned by the filter used in [`StatementStore::statements_by_hashes`].
201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202pub enum FilterDecision {
203 /// Skip this statement, continue to next.
204 Skip,
205 /// Take this statement, continue to next.
206 Take,
207 /// Stop iteration, return collected statements.
208 Abort,
209}
210
211/// Statement store API.
212pub trait StatementStore: Send + Sync {
213 /// Return all statements.
214 fn statements(&self) -> Result<Vec<(Hash, Statement)>>;
215
216 /// Return recent statements and clear the internal index.
217 ///
218 /// This consumes and clears the recently received statements,
219 /// allowing new statements to be collected from this point forward.
220 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>>;
221
222 /// Get statement by hash.
223 fn statement(&self, hash: &Hash) -> Result<Option<Statement>>;
224
225 /// Check if statement exists in the store
226 ///
227 /// Fast index check without accessing the DB.
228 fn has_statement(&self, hash: &Hash) -> bool;
229
230 /// Return all statement hashes.
231 fn statement_hashes(&self) -> Vec<Hash>;
232
233 /// Fetch statements by their hashes with a filter callback.
234 ///
235 /// The callback receives (hash, encoded_bytes, decoded_statement) and returns:
236 /// - `Skip`: ignore this statement, continue to next
237 /// - `Take`: include this statement in the result, continue to next
238 /// - `Abort`: stop iteration, return collected statements so far
239 ///
240 /// Returns (statements, number_of_hashes_processed).
241 fn statements_by_hashes(
242 &self,
243 hashes: &[Hash],
244 filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
245 ) -> Result<(Vec<(Hash, Statement)>, usize)>;
246
247 /// Return the data of all known statements which include all topics and have no `DecryptionKey`
248 /// field.
249 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
250
251 /// Return the data of all known statements whose decryption key is identified as `dest` (this
252 /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
253 /// private key for symmetric ciphers).
254 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
255
256 /// Return the decrypted data of all known statements whose decryption key is identified as
257 /// `dest`. The key must be available to the client.
258 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
259
260 /// Return all known statements which include all topics and have no `DecryptionKey`
261 /// field.
262 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
263
264 /// Return all known statements whose decryption key is identified as `dest` (this
265 /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
266 /// private key for symmetric ciphers).
267 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
268
269 /// Return the statement and the decrypted data of all known statements whose decryption key is
270 /// identified as `dest`. The key must be available to the client.
271 ///
272 /// The result is for each statement: the SCALE-encoded statement concatenated to the
273 /// decrypted data.
274 fn posted_clear_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32])
275 -> Result<Vec<Vec<u8>>>;
276
277 /// Submit a statement.
278 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult;
279
280 /// Remove a statement from the store.
281 fn remove(&self, hash: &Hash) -> Result<()>;
282
283 /// Remove all statements authored by `who`.
284 fn remove_by(&self, who: [u8; 32]) -> Result<()>;
285}