sp_statement_store/store_api.rs
1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18pub use crate::runtime_api::StatementSource;
19use crate::{Hash, Statement, Topic, MAX_ANY_TOPICS, MAX_TOPICS};
20use sp_core::{bounded_vec::BoundedVec, Bytes, ConstU32};
21use std::collections::HashSet;
22
23/// Statement store error.
24#[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)]
25#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
26pub enum Error {
27 /// Database error.
28 #[error("Database error: {0:?}")]
29 Db(String),
30 /// Decoding error
31 #[error("Decoding error: {0:?}")]
32 Decode(String),
33 /// Error reading from storage.
34 #[error("Storage error: {0:?}")]
35 Storage(String),
36 /// Invalid configuration.
37 #[error("Invalid configuration: {0}")]
38 InvalidConfig(String),
39}
40
41/// Filter for subscribing to statements with different topics.
42#[derive(Debug, Clone)]
43#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
44#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
45pub enum TopicFilter {
46 /// Matches all topics.
47 Any,
48 /// Matches only statements including all of the given topics.
49 /// Bytes are expected to be a 32-byte topic. Up to [`MAX_TOPICS`] topics can be provided.
50 MatchAll(BoundedVec<Topic, ConstU32<{ MAX_TOPICS as u32 }>>),
51 /// Matches statements including any of the given topics.
52 /// Bytes are expected to be a 32-byte topic. Up to [`MAX_ANY_TOPICS`] topics can be provided.
53 MatchAny(BoundedVec<Topic, ConstU32<{ MAX_ANY_TOPICS as u32 }>>),
54}
55
56/// Topic filter for statement subscriptions, optimized for matching.
57#[derive(Clone, Debug)]
58pub enum OptimizedTopicFilter {
59 /// Matches all topics.
60 Any,
61 /// Matches only statements including all of the given topics.
62 /// Up to `4` topics can be provided.
63 MatchAll(HashSet<Topic>),
64 /// Matches statements including any of the given topics.
65 /// Up to `128` topics can be provided.
66 MatchAny(HashSet<Topic>),
67}
68
69impl OptimizedTopicFilter {
70 /// Check if the statement matches the filter.
71 pub fn matches(&self, statement: &Statement) -> bool {
72 match self {
73 OptimizedTopicFilter::Any => true,
74 OptimizedTopicFilter::MatchAll(topics) => {
75 statement.topics().iter().filter(|topic| topics.contains(*topic)).count() ==
76 topics.len()
77 },
78 OptimizedTopicFilter::MatchAny(topics) => {
79 statement.topics().iter().any(|topic| topics.contains(topic))
80 },
81 }
82 }
83}
84
85// Convert TopicFilter to CheckedTopicFilter.
86impl From<TopicFilter> for OptimizedTopicFilter {
87 fn from(filter: TopicFilter) -> Self {
88 match filter {
89 TopicFilter::Any => OptimizedTopicFilter::Any,
90 TopicFilter::MatchAll(topics) => {
91 let mut parsed_topics = HashSet::with_capacity(topics.len());
92 for topic in topics {
93 parsed_topics.insert(topic);
94 }
95 OptimizedTopicFilter::MatchAll(parsed_topics)
96 },
97 TopicFilter::MatchAny(topics) => {
98 let mut parsed_topics = HashSet::with_capacity(topics.len());
99 for topic in topics {
100 parsed_topics.insert(topic);
101 }
102 OptimizedTopicFilter::MatchAny(parsed_topics)
103 },
104 }
105 }
106}
107
108/// Reason why a statement was rejected from the store.
109#[derive(Debug, Clone, Eq, PartialEq)]
110#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
111#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
112pub enum RejectionReason {
113 /// Statement data exceeds the maximum allowed size for the account.
114 DataTooLarge {
115 /// The size of the submitted statement data.
116 submitted_size: usize,
117 /// Still available data size for the account.
118 available_size: usize,
119 },
120 /// Attempting to replace a channel message with lower or equal expiry.
121 ChannelPriorityTooLow {
122 /// The expiry of the submitted statement.
123 submitted_expiry: u64,
124 /// The minimum expiry of the existing channel message.
125 min_expiry: u64,
126 },
127 /// Account reached its statement limit and submitted expiry is too low to evict existing.
128 AccountFull {
129 /// The expiry of the submitted statement.
130 submitted_expiry: u64,
131 /// The minimum expiry of the existing statement.
132 min_expiry: u64,
133 },
134 /// The global statement store is full and cannot accept new statements.
135 StoreFull,
136 /// Account has no allowance set.
137 NoAllowance,
138}
139
140impl RejectionReason {
141 /// Returns a short string label suitable for use in metrics.
142 pub fn label(&self) -> &'static str {
143 match self {
144 RejectionReason::DataTooLarge { .. } => "data_too_large",
145 RejectionReason::ChannelPriorityTooLow { .. } => "channel_priority_too_low",
146 RejectionReason::AccountFull { .. } => "account_full",
147 RejectionReason::StoreFull => "store_full",
148 RejectionReason::NoAllowance => "no_allowance",
149 }
150 }
151}
152
153/// Reason why a statement failed validation.
154#[derive(Debug, Clone, Eq, PartialEq)]
155#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
156#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
157pub enum InvalidReason {
158 /// Statement has no proof.
159 NoProof,
160 /// Proof validation failed.
161 BadProof,
162 /// Statement exceeds max allowed statement size.
163 EncodingTooLarge {
164 /// The size of the submitted statement encoding.
165 submitted_size: usize,
166 /// The maximum allowed size.
167 max_size: usize,
168 },
169 /// Statement has already expired. The expiry field is in the past.
170 AlreadyExpired,
171}
172
173impl InvalidReason {
174 /// Returns a short string label suitable for use in metrics.
175 pub fn label(&self) -> &'static str {
176 match self {
177 InvalidReason::NoProof => "no_proof",
178 InvalidReason::BadProof => "bad_proof",
179 InvalidReason::EncodingTooLarge { .. } => "encoding_too_large",
180 InvalidReason::AlreadyExpired => "already_expired",
181 }
182 }
183}
184
185/// Statement submission outcome
186#[derive(Debug, Clone, Eq, PartialEq)]
187#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
188#[cfg_attr(feature = "serde", serde(tag = "status", rename_all = "camelCase"))]
189pub enum SubmitResult {
190 /// Statement was accepted as new.
191 New,
192 /// Statement was already known.
193 Known,
194 /// Statement was already known but has expired.
195 KnownExpired,
196 /// Statement was rejected because the store is full or priority is too low.
197 Rejected(RejectionReason),
198 /// Statement failed validation.
199 Invalid(InvalidReason),
200 /// Internal store error.
201 InternalError(Error),
202}
203
204/// An item returned by the statement subscription stream.
205#[derive(Debug, Clone, Eq, PartialEq)]
206#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
207#[cfg_attr(feature = "serde", serde(tag = "event", content = "data", rename_all = "camelCase"))]
208pub enum StatementEvent {
209 /// A batch of statements matching the subscription filter.
210 NewStatements {
211 /// A batch of statements matching the subscription filter, each entry is a SCALE-encoded
212 /// statement.
213 statements: Vec<Bytes>,
214 /// An optional count of how many more matching statements are in the store after this
215 /// batch. This guarantees to the client that it will receive at least this many more
216 /// statements in the subscription stream, but it may receive more if new statements are
217 /// added to the store that match the filter.
218 #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
219 remaining: Option<u32>,
220 },
221}
222
223/// Result type for `Error`
224pub type Result<T> = std::result::Result<T, Error>;
225
226/// Decision returned by the filter used in [`StatementStore::statements_by_hashes`].
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum FilterDecision {
229 /// Skip this statement, continue to next.
230 Skip,
231 /// Take this statement, continue to next.
232 Take,
233 /// Stop iteration, return collected statements.
234 Abort,
235}
236
237/// Statement store API.
238pub trait StatementStore: Send + Sync {
239 /// Return all statements.
240 fn statements(&self) -> Result<Vec<(Hash, Statement)>>;
241
242 /// Return recent statements and clear the internal index.
243 ///
244 /// This consumes and clears the recently received statements,
245 /// allowing new statements to be collected from this point forward.
246 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>>;
247
248 /// Get statement by hash.
249 fn statement(&self, hash: &Hash) -> Result<Option<Statement>>;
250
251 /// Check if statement exists in the store
252 ///
253 /// Fast index check without accessing the DB.
254 fn has_statement(&self, hash: &Hash) -> bool;
255
256 /// Return all statement hashes.
257 fn statement_hashes(&self) -> Vec<Hash>;
258
259 /// Fetch statements by their hashes with a filter callback.
260 ///
261 /// The callback receives (hash, encoded_bytes, decoded_statement) and returns:
262 /// - `Skip`: ignore this statement, continue to next
263 /// - `Take`: include this statement in the result, continue to next
264 /// - `Abort`: stop iteration, return collected statements so far
265 ///
266 /// Returns (statements, number_of_hashes_processed).
267 fn statements_by_hashes(
268 &self,
269 hashes: &[Hash],
270 filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
271 ) -> Result<(Vec<(Hash, Statement)>, usize)>;
272
273 /// Return the data of all known statements which include all topics and have no `DecryptionKey`
274 /// field.
275 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
276
277 /// Return the data of all known statements whose decryption key is identified as `dest` (this
278 /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
279 /// private key for symmetric ciphers).
280 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
281
282 /// Return the decrypted data of all known statements whose decryption key is identified as
283 /// `dest`. The key must be available to the client.
284 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
285
286 /// Return all known statements which include all topics and have no `DecryptionKey`
287 /// field.
288 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
289
290 /// Return all known statements whose decryption key is identified as `dest` (this
291 /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
292 /// private key for symmetric ciphers).
293 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
294
295 /// Return the statement and the decrypted data of all known statements whose decryption key is
296 /// identified as `dest`. The key must be available to the client.
297 ///
298 /// The result is for each statement: the SCALE-encoded statement concatenated to the
299 /// decrypted data.
300 fn posted_clear_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32])
301 -> Result<Vec<Vec<u8>>>;
302
303 /// Submit a statement.
304 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult;
305
306 /// Remove a statement from the store.
307 fn remove(&self, hash: &Hash) -> Result<()>;
308
309 /// Remove all statements authored by `who`.
310 fn remove_by(&self, who: [u8; 32]) -> Result<()>;
311}