Skip to main content

sp_statement_store/
store_api.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18pub use crate::runtime_api::StatementSource;
19use crate::{Hash, Statement, Topic, MAX_ANY_TOPICS, MAX_TOPICS};
20use sp_core::{bounded_vec::BoundedVec, Bytes, ConstU32};
21use std::collections::HashSet;
22
23/// Statement store error.
24#[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)]
25#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
26pub enum Error {
27	/// Database error.
28	#[error("Database error: {0:?}")]
29	Db(String),
30	/// Decoding error
31	#[error("Decoding error: {0:?}")]
32	Decode(String),
33	/// Error reading from storage.
34	#[error("Storage error: {0:?}")]
35	Storage(String),
36	/// Invalid configuration.
37	#[error("Invalid configuration: {0}")]
38	InvalidConfig(String),
39}
40
41/// Filter for subscribing to statements with different topics.
42#[derive(Debug, Clone)]
43#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
44#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
45pub enum TopicFilter {
46	/// Matches all topics.
47	Any,
48	/// Matches only statements including all of the given topics.
49	/// Bytes are expected to be a 32-byte topic. Up to [`MAX_TOPICS`] topics can be provided.
50	MatchAll(BoundedVec<Topic, ConstU32<{ MAX_TOPICS as u32 }>>),
51	/// Matches statements including any of the given topics.
52	/// Bytes are expected to be a 32-byte topic. Up to [`MAX_ANY_TOPICS`] topics can be provided.
53	MatchAny(BoundedVec<Topic, ConstU32<{ MAX_ANY_TOPICS as u32 }>>),
54}
55
56/// Topic filter for statement subscriptions, optimized for matching.
57#[derive(Clone, Debug)]
58pub enum OptimizedTopicFilter {
59	/// Matches all topics.
60	Any,
61	/// Matches only statements including all of the given topics.
62	/// Up to `4` topics can be provided.
63	MatchAll(HashSet<Topic>),
64	/// Matches statements including any of the given topics.
65	/// Up to `128` topics can be provided.
66	MatchAny(HashSet<Topic>),
67}
68
69impl OptimizedTopicFilter {
70	/// Check if the statement matches the filter.
71	pub fn matches(&self, statement: &Statement) -> bool {
72		match self {
73			OptimizedTopicFilter::Any => true,
74			OptimizedTopicFilter::MatchAll(topics) => {
75				statement.topics().iter().filter(|topic| topics.contains(*topic)).count() ==
76					topics.len()
77			},
78			OptimizedTopicFilter::MatchAny(topics) => {
79				statement.topics().iter().any(|topic| topics.contains(topic))
80			},
81		}
82	}
83}
84
85// Convert TopicFilter to CheckedTopicFilter.
86impl From<TopicFilter> for OptimizedTopicFilter {
87	fn from(filter: TopicFilter) -> Self {
88		match filter {
89			TopicFilter::Any => OptimizedTopicFilter::Any,
90			TopicFilter::MatchAll(topics) => {
91				let mut parsed_topics = HashSet::with_capacity(topics.len());
92				for topic in topics {
93					parsed_topics.insert(topic);
94				}
95				OptimizedTopicFilter::MatchAll(parsed_topics)
96			},
97			TopicFilter::MatchAny(topics) => {
98				let mut parsed_topics = HashSet::with_capacity(topics.len());
99				for topic in topics {
100					parsed_topics.insert(topic);
101				}
102				OptimizedTopicFilter::MatchAny(parsed_topics)
103			},
104		}
105	}
106}
107
108/// Reason why a statement was rejected from the store.
109#[derive(Debug, Clone, Eq, PartialEq)]
110#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
111#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
112pub enum RejectionReason {
113	/// Statement data exceeds the maximum allowed size for the account.
114	DataTooLarge {
115		/// The size of the submitted statement data.
116		submitted_size: usize,
117		/// Still available data size for the account.
118		available_size: usize,
119	},
120	/// Attempting to replace a channel message with lower or equal expiry.
121	ChannelPriorityTooLow {
122		/// The expiry of the submitted statement.
123		submitted_expiry: u64,
124		/// The minimum expiry of the existing channel message.
125		min_expiry: u64,
126	},
127	/// Account reached its statement limit and submitted expiry is too low to evict existing.
128	AccountFull {
129		/// The expiry of the submitted statement.
130		submitted_expiry: u64,
131		/// The minimum expiry of the existing statement.
132		min_expiry: u64,
133	},
134	/// The global statement store is full and cannot accept new statements.
135	StoreFull,
136	/// Account has no allowance set.
137	NoAllowance,
138}
139
140impl RejectionReason {
141	/// Returns a short string label suitable for use in metrics.
142	pub fn label(&self) -> &'static str {
143		match self {
144			RejectionReason::DataTooLarge { .. } => "data_too_large",
145			RejectionReason::ChannelPriorityTooLow { .. } => "channel_priority_too_low",
146			RejectionReason::AccountFull { .. } => "account_full",
147			RejectionReason::StoreFull => "store_full",
148			RejectionReason::NoAllowance => "no_allowance",
149		}
150	}
151}
152
153/// Reason why a statement failed validation.
154#[derive(Debug, Clone, Eq, PartialEq)]
155#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
156#[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))]
157pub enum InvalidReason {
158	/// Statement has no proof.
159	NoProof,
160	/// Proof validation failed.
161	BadProof,
162	/// Statement exceeds max allowed statement size.
163	EncodingTooLarge {
164		/// The size of the submitted statement encoding.
165		submitted_size: usize,
166		/// The maximum allowed size.
167		max_size: usize,
168	},
169	/// Statement has already expired. The expiry field is in the past.
170	AlreadyExpired,
171}
172
173/// Statement submission outcome
174#[derive(Debug, Clone, Eq, PartialEq)]
175#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
176#[cfg_attr(feature = "serde", serde(tag = "status", rename_all = "camelCase"))]
177pub enum SubmitResult {
178	/// Statement was accepted as new.
179	New,
180	/// Statement was already known.
181	Known,
182	/// Statement was already known but has expired.
183	KnownExpired,
184	/// Statement was rejected because the store is full or priority is too low.
185	Rejected(RejectionReason),
186	/// Statement failed validation.
187	Invalid(InvalidReason),
188	/// Internal store error.
189	InternalError(Error),
190}
191
192/// An item returned by the statement subscription stream.
193#[derive(Debug, Clone, Eq, PartialEq)]
194#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
195#[cfg_attr(feature = "serde", serde(tag = "event", content = "data", rename_all = "camelCase"))]
196pub enum StatementEvent {
197	/// A batch of statements matching the subscription filter.
198	NewStatements {
199		/// A batch of statements matching the subscription filter, each entry is a SCALE-encoded
200		/// statement.
201		statements: Vec<Bytes>,
202		/// An optional count of how many more matching statements are in the store after this
203		/// batch. This guarantees to the client that it will receive at least this many more
204		/// statements in the subscription stream, but it may receive more if new statements are
205		/// added to the store that match the filter.
206		#[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
207		remaining: Option<u32>,
208	},
209}
210
211/// Result type for `Error`
212pub type Result<T> = std::result::Result<T, Error>;
213
214/// Decision returned by the filter used in [`StatementStore::statements_by_hashes`].
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum FilterDecision {
217	/// Skip this statement, continue to next.
218	Skip,
219	/// Take this statement, continue to next.
220	Take,
221	/// Stop iteration, return collected statements.
222	Abort,
223}
224
225/// Statement store API.
226pub trait StatementStore: Send + Sync {
227	/// Return all statements.
228	fn statements(&self) -> Result<Vec<(Hash, Statement)>>;
229
230	/// Return recent statements and clear the internal index.
231	///
232	/// This consumes and clears the recently received statements,
233	/// allowing new statements to be collected from this point forward.
234	fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>>;
235
236	/// Get statement by hash.
237	fn statement(&self, hash: &Hash) -> Result<Option<Statement>>;
238
239	/// Check if statement exists in the store
240	///
241	/// Fast index check without accessing the DB.
242	fn has_statement(&self, hash: &Hash) -> bool;
243
244	/// Return all statement hashes.
245	fn statement_hashes(&self) -> Vec<Hash>;
246
247	/// Fetch statements by their hashes with a filter callback.
248	///
249	/// The callback receives (hash, encoded_bytes, decoded_statement) and returns:
250	/// - `Skip`: ignore this statement, continue to next
251	/// - `Take`: include this statement in the result, continue to next
252	/// - `Abort`: stop iteration, return collected statements so far
253	///
254	/// Returns (statements, number_of_hashes_processed).
255	fn statements_by_hashes(
256		&self,
257		hashes: &[Hash],
258		filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
259	) -> Result<(Vec<(Hash, Statement)>, usize)>;
260
261	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
262	/// field.
263	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
264
265	/// Return the data of all known statements whose decryption key is identified as `dest` (this
266	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
267	/// private key for symmetric ciphers).
268	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
269
270	/// Return the decrypted data of all known statements whose decryption key is identified as
271	/// `dest`. The key must be available to the client.
272	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
273
274	/// Return all known statements which include all topics and have no `DecryptionKey`
275	/// field.
276	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>>;
277
278	/// Return all known statements whose decryption key is identified as `dest` (this
279	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
280	/// private key for symmetric ciphers).
281	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>>;
282
283	/// Return the statement and the decrypted data of all known statements whose decryption key is
284	/// identified as `dest`. The key must be available to the client.
285	///
286	/// The result is for each statement: the SCALE-encoded statement concatenated to the
287	/// decrypted data.
288	fn posted_clear_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32])
289		-> Result<Vec<Vec<u8>>>;
290
291	/// Submit a statement.
292	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult;
293
294	/// Remove a statement from the store.
295	fn remove(&self, hash: &Hash) -> Result<()>;
296
297	/// Remove all statements authored by `who`.
298	fn remove_by(&self, who: [u8; 32]) -> Result<()>;
299}