sp_statement_store/store_api.rs
1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18pub use crate::runtime_api::StatementSource;
19use crate::{Hash, Statement, Topic, MAX_ANY_TOPICS, MAX_TOPICS};
20use sp_core::{bounded_vec::BoundedVec, Bytes, ConstU32};
21use std::collections::HashSet;
22
23/// Statement store error.
24#[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)]
25#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
26pub enum Error {
27 /// Database error.
28 #[error("Database error: {0:?}")]
29 Db(String),
30 /// Decoding error
31 #[error("Decoding error: {0:?}")]
32 Decode(String),
33 /// Error reading from storage.
34 #[error("Storage error: {0:?}")]
35 Storage(String),
36 /// Invalid configuration.
37 #[error("Invalid configuration: {0}")]
38 InvalidConfig(String),
39}
40
41/// Filter for subscribing to statements with different topics.
42#[derive(Debug, Clone)]
43#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
44#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
45pub enum TopicFilter {
46 /// Matches all topics.
47 Any,
48 /// Matches only statements including all of the given topics.
49 /// Bytes are expected to be a 32-byte topic. Up to [`MAX_TOPICS`] topics can be provided.
50 MatchAll(BoundedVec<Topic, ConstU32<{ MAX_TOPICS as u32 }>>),
51 /// Matches statements including any of the given topics.
52 /// Bytes are expected to be a 32-byte topic. Up to [`MAX_ANY_TOPICS`] topics can be provided.
53 MatchAny(BoundedVec<Topic, ConstU32<{ MAX_ANY_TOPICS as u32 }>>),
54}
55
56/// Topic filter for statement subscriptions, optimized for matching.
57#[derive(Clone, Debug)]
58pub enum OptimizedTopicFilter {
59 /// Matches all topics.
60 Any,
61 /// Matches only statements including all of the given topics.
62 /// Up to `4` topics can be provided.
63 MatchAll(HashSet<Topic>),
64 /// Matches statements including any of the given topics.
65 /// Up to `128` topics can be provided.
66 MatchAny(HashSet<Topic>),
67}
68
69impl OptimizedTopicFilter {
70 /// Check if the statement matches the filter.
71 pub fn matches(&self, statement: &Statement) -> bool {
72 match self {
73 OptimizedTopicFilter::Any => true,
74 OptimizedTopicFilter::MatchAll(topics) => {
75 statement.topics().iter().filter(|topic| topics.contains(*topic)).count() ==
76 topics.len()
77 },
78 OptimizedTopicFilter::MatchAny(topics) => {
79 statement.topics().iter().any(|topic| topics.contains(topic))
80 },
81 }
82 }
83}
84
85// Convert TopicFilter to CheckedTopicFilter.
86impl From<TopicFilter> for OptimizedTopicFilter {
87 fn from(filter: TopicFilter) -> Self {
88 match filter {
89 TopicFilter::Any => OptimizedTopicFilter::Any,
90 TopicFilter::MatchAll(topics) => {
91 let mut parsed_topics = HashSet::with_capacity(topics.len());
92 for topic in topics {
93 parsed_topics.insert(topic);
94 }
95 OptimizedTopicFilter::MatchAll(parsed_topics)
96 },
97 TopicFilter::MatchAny(topics) => {
98 let mut parsed_topics = HashSet::with_capacity(topics.len());
99 for topic in topics {
100 parsed_topics.insert(topic);
101 }
102 OptimizedTopicFilter::MatchAny(parsed_topics)
103 },
104 }
105 }
106}
107
108/// Reason why a statement was rejected from the store.
109#[derive(Debug, Clone, Eq, PartialEq)]
110#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
111#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
112pub enum RejectionReason {
113 /// Statement data exceeds the maximum allowed size for the account.
114 DataTooLarge {
115 /// The size of the submitted statement data.
116 submitted_size: usize,
117 /// Still available data size for the account.
118 available_size: usize,
119 },
120 /// Attempting to replace a channel message with lower or equal expiry.
121 ChannelPriorityTooLow {
122 /// The expiry of the submitted statement.
123 submitted_expiry: u64,
124 /// The minimum expiry of the existing channel message.
125 min_expiry: u64,
126 },
127 /// Account reached its statement limit and submitted expiry is too low to evict existing.
128 AccountFull {
129 /// The expiry of the submitted statement.
130 submitted_expiry: u64,
131 /// The minimum expiry of the existing statement.
132 min_expiry: u64,
133 },
134 /// The global statement store is full and cannot accept new statements.
135 StoreFull,
136 /// Account has no allowance set.
137 NoAllowance,
138}
139
140impl RejectionReason {
141 /// Returns a short string label suitable for use in metrics.
142 pub fn label(&self) -> &'static str {
143 match self {
144 RejectionReason::DataTooLarge { .. } => "data_too_large",
145 RejectionReason::ChannelPriorityTooLow { .. } => "channel_priority_too_low",
146 RejectionReason::AccountFull { .. } => "account_full",
147 RejectionReason::StoreFull => "store_full",
148 RejectionReason::NoAllowance => "no_allowance",
149 }
150 }
151}
152
153/// Reason why a statement failed validation.
154#[derive(Debug, Clone, Eq, PartialEq)]
155#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
156#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
157pub enum InvalidReason {
158 /// Statement has no proof.
159 NoProof,
160 /// Proof validation failed.
161 BadProof,
162 /// Statement exceeds max allowed statement size.
163 EncodingTooLarge {
164 /// The size of the submitted statement encoding.
165 submitted_size: usize,
166 /// The maximum allowed size.
167 max_size: usize,
168 },
169 /// Statement has already expired. The expiry field is in the past.
170 AlreadyExpired,
171}
172
173/// Statement submission outcome
174#[derive(Debug, Clone, Eq, PartialEq)]
175#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
176#[cfg_attr(feature = "serde", serde(tag = "status", rename_all = "camelCase"))]
177pub enum SubmitResult {
178 /// Statement was accepted as new.
179 New,
180 /// Statement was already known.
181 Known,
182 /// Statement was already known but has expired.
183 KnownExpired,
184 /// Statement was rejected because the store is full or priority is too low.
185 Rejected(RejectionReason),
186 /// Statement failed validation.
187 Invalid(InvalidReason),
188 /// Internal store error.
189 InternalError(Error),
190}
191
192/// An item returned by the statement subscription stream.
193#[derive(Debug, Clone, Eq, PartialEq)]
194#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
195#[cfg_attr(feature = "serde", serde(tag = "event", content = "data", rename_all = "camelCase"))]
196pub enum StatementEvent {
197 /// A batch of statements matching the subscription filter.
198 NewStatements {
199 /// A batch of statements matching the subscription filter, each entry is a SCALE-encoded
200 /// statement.
201 statements: Vec<Bytes>,
202 /// An optional count of how many more matching statements are in the store after this
203 /// batch. This guarantees to the client that it will receive at least this many more
204 /// statements in the subscription stream, but it may receive more if new statements are
205 /// added to the store that match the filter.
206 #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
207 remaining: Option<u32>,
208 },
209}
210
211/// Result type for `Error`
212pub type Result<T> = std::result::Result<T, Error>;
213
214/// Decision returned by the filter used in [`StatementStore::statements_by_hashes`].
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum FilterDecision {
217 /// Skip this statement, continue to next.
218 Skip,
219 /// Take this statement, continue to next.
220 Take,
221 /// Stop iteration, return collected statements.
222 Abort,
223}
224
225/// Statement store API.
226pub trait StatementStore: Send + Sync {
227 /// Return all statements.
228 fn statements(&self) -> Result<Vec<(Hash, Statement)>>;
229
230 /// Return recent statements and clear the internal index.
231 ///
232 /// This consumes and clears the recently received statements,
233 /// allowing new statements to be collected from this point forward.
234 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>>;
235
236 /// Get statement by hash.
237 fn statement(&self, hash: &Hash) -> Result<Option<Statement>>;
238
239 /// Check if statement exists in the store
240 ///
241 /// Fast index check without accessing the DB.
242 fn has_statement(&self, hash: &Hash) -> bool;
243
244 /// Return all statement hashes.
245 fn statement_hashes(&self) -> Vec<Hash>;
246
247 /// Fetch statements by their hashes with a filter callback.
248 ///
249 /// The callback receives (hash, encoded_bytes, decoded_statement) and returns:
250 /// - `Skip`: ignore this statement, continue to next
251 /// - `Take`: include this statement in the result, continue to next
252 /// - `Abort`: stop iteration, return collected statements so far
253 ///
254 /// Returns (statements, number_of_hashes_processed).
255 fn statements_by_hashes(
256 &self,
257 hashes: &[Hash],
258 filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
259 ) -> Result<(Vec<(Hash, Statement)>, usize)>;
260
261 /// Return the data of all known statements which include all topics and have no `DecryptionKey`
262 /// field.
263 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
264
265 /// Return the data of all known statements whose decryption key is identified as `dest` (this
266 /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
267 /// private key for symmetric ciphers).
268 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
269
270 /// Return the decrypted data of all known statements whose decryption key is identified as
271 /// `dest`. The key must be available to the client.
272 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
273
274 /// Return all known statements which include all topics and have no `DecryptionKey`
275 /// field.
276 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
277
278 /// Return all known statements whose decryption key is identified as `dest` (this
279 /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
280 /// private key for symmetric ciphers).
281 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
282
283 /// Return the statement and the decrypted data of all known statements whose decryption key is
284 /// identified as `dest`. The key must be available to the client.
285 ///
286 /// The result is for each statement: the SCALE-encoded statement concatenated to the
287 /// decrypted data.
288 fn posted_clear_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32])
289 -> Result<Vec<Vec<u8>>>;
290
291 /// Submit a statement.
292 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult;
293
294 /// Remove a statement from the store.
295 fn remove(&self, hash: &Hash) -> Result<()>;
296
297 /// Remove all statements authored by `who`.
298 fn remove_by(&self, who: [u8; 32]) -> Result<()>;
299}