holochain_state/
scratch.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use holo_hash::ActionHash;
6use holo_hash::AnyDhtHash;
7use holo_hash::EntryHash;
8use holochain_keystore::KeystoreError;
9use thiserror::Error;
10
11use crate::prelude::*;
12use crate::query::StmtIter;
13
14/// The "scratch" is an in-memory space to stage Actions to be committed at the
15/// end of the CallZome workflow.
16///
17/// This space must also be queryable: specifically, it needs to be combined
18/// into queries into the database which return Actions. This is done by
19/// a simple filter on the scratch space, and then chaining that iterator
20/// onto the iterators over the Actions in the database(s) produced by the
21/// Cascade.
22#[derive(Debug, Clone, Default)]
23pub struct Scratch {
24    actions: Vec<SignedActionHashed>,
25    entries: HashMap<EntryHash, Arc<Entry>>,
26    chain_top_ordering: ChainTopOrdering,
27    scheduled_fns: Vec<ScheduledFn>,
28    chain_head: Option<(u32, usize)>,
29}
30
31#[derive(Debug, Clone)]
32pub struct SyncScratch(Arc<Mutex<Scratch>>);
33
34// MD: hmm, why does this need to be a separate type? Why collect into this?
35pub struct FilteredScratch {
36    actions: Vec<SignedActionHashed>,
37}
38
39impl Scratch {
40    pub fn new() -> Self {
41        Self {
42            chain_top_ordering: ChainTopOrdering::Relaxed,
43            ..Default::default()
44        }
45    }
46
47    pub fn scheduled_fns(&self) -> &[ScheduledFn] {
48        &self.scheduled_fns
49    }
50
51    pub fn add_scheduled_fn(&mut self, scheduled_fn: ScheduledFn) {
52        self.scheduled_fns.push(scheduled_fn)
53    }
54
55    pub fn chain_top_ordering(&self) -> ChainTopOrdering {
56        self.chain_top_ordering
57    }
58
59    pub fn respect_chain_top_ordering(&mut self, chain_top_ordering: ChainTopOrdering) {
60        if chain_top_ordering == ChainTopOrdering::Strict {
61            self.chain_top_ordering = chain_top_ordering;
62        }
63    }
64
65    pub fn add_action(&mut self, item: SignedActionHashed, chain_top_ordering: ChainTopOrdering) {
66        self.respect_chain_top_ordering(chain_top_ordering);
67        let seq = item.action().action_seq();
68        match &mut self.chain_head {
69            Some((h, i)) => {
70                if seq > *h {
71                    *h = seq;
72                    *i = self.actions.len();
73                }
74            }
75            h @ None => *h = Some((seq, self.actions.len())),
76        }
77        self.actions.push(item);
78    }
79
80    pub fn chain_head(&self) -> Option<HeadInfo> {
81        self.chain_head.as_ref().and_then(|(_, i)| {
82            self.actions.get(*i).map(|h| HeadInfo {
83                action: h.action_address().clone(),
84                seq: h.action().action_seq(),
85                timestamp: h.action().timestamp(),
86            })
87        })
88    }
89
90    pub fn add_entry(&mut self, entry_hashed: EntryHashed, chain_top_ordering: ChainTopOrdering) {
91        self.respect_chain_top_ordering(chain_top_ordering);
92        let (entry, hash) = entry_hashed.into_inner();
93        self.entries.insert(hash, Arc::new(entry));
94    }
95
96    pub fn as_filter(&self, f: impl Fn(&SignedActionHashed) -> bool) -> FilteredScratch {
97        let actions = self.actions.iter().filter(|&shh| f(shh)).cloned().collect();
98        FilteredScratch { actions }
99    }
100
101    pub fn into_sync(self) -> SyncScratch {
102        SyncScratch(Arc::new(Mutex::new(self)))
103    }
104
105    pub fn len(&self) -> usize {
106        self.actions.len()
107    }
108
109    pub fn is_empty(&self) -> bool {
110        self.actions.is_empty() && self.scheduled_fns.is_empty()
111    }
112
113    pub fn actions(&self) -> impl Iterator<Item = &SignedActionHashed> {
114        self.actions.iter()
115    }
116
117    pub fn records(&self) -> impl Iterator<Item = Record> + '_ {
118        self.actions.iter().cloned().map(move |shh| {
119            let entry = shh
120                .action()
121                .entry_hash()
122                // TODO: let's use Arc<Entry> from here on instead of dereferencing
123                .and_then(|eh| self.entries.get(eh).map(|e| (**e).clone()));
124            Record::new(shh, entry)
125        })
126    }
127
128    /// Get the entries on in the scratch.
129    pub fn entries(&self) -> impl Iterator<Item = (&EntryHash, &Arc<Entry>)> {
130        self.entries.iter()
131    }
132
133    pub fn num_actions(&self) -> usize {
134        self.actions.len()
135    }
136
137    fn get_exact_record(&self, hash: &ActionHash) -> StateQueryResult<Option<Record>> {
138        Ok(self.get_action(hash)?.map(|shh| {
139            let entry = shh
140                .action()
141                .entry_hash()
142                .and_then(|eh| self.get_entry(eh).ok());
143            Record::new(shh, entry.flatten())
144        }))
145    }
146
147    fn get_any_record(&self, hash: &EntryHash) -> StateQueryResult<Option<Record>> {
148        let r = self.get_entry(hash)?.and_then(|entry| {
149            let shh = self
150                .actions()
151                .find(|&h| {
152                    h.action()
153                        .entry_hash()
154                        .map(|eh| eh == hash)
155                        .unwrap_or(false)
156                })?
157                .clone();
158            Some(Record::new(shh, Some(entry)))
159        });
160        Ok(r)
161    }
162
163    pub fn drain_scheduled_fns(&mut self) -> impl Iterator<Item = ScheduledFn> + '_ {
164        self.scheduled_fns.drain(..)
165    }
166
167    /// Drain out all the actions.
168    pub fn drain_actions(&mut self) -> impl Iterator<Item = SignedActionHashed> + '_ {
169        self.chain_head = None;
170        self.actions.drain(..)
171    }
172
173    /// Drain out all the entries.
174    pub fn drain_entries(&mut self) -> impl Iterator<Item = EntryHashed> + '_ {
175        self.entries.drain().map(|(hash, entry)| {
176            EntryHashed::with_pre_hashed(
177                Arc::try_unwrap(entry).unwrap_or_else(|e| (*e).clone()),
178                hash,
179            )
180        })
181    }
182}
183
184impl SyncScratch {
185    pub fn apply<T, F: FnOnce(&mut Scratch) -> T>(&self, f: F) -> Result<T, SyncScratchError> {
186        Ok(f(&mut *self
187            .0
188            .lock()
189            .map_err(|_| SyncScratchError::ScratchLockPoison)?))
190    }
191
192    pub fn apply_and_then<T, E, F>(&self, f: F) -> Result<T, E>
193    where
194        E: From<SyncScratchError>,
195        F: FnOnce(&mut Scratch) -> Result<T, E>,
196    {
197        f(&mut *self
198            .0
199            .lock()
200            .map_err(|_| SyncScratchError::ScratchLockPoison)?)
201    }
202}
203
204impl Store for Scratch {
205    fn get_entry(&self, hash: &EntryHash) -> StateQueryResult<Option<Entry>> {
206        Ok(self.entries.get(hash).map(|arc| (**arc).clone()))
207    }
208
209    fn contains_entry(&self, hash: &EntryHash) -> StateQueryResult<bool> {
210        Ok(self.entries.contains_key(hash))
211    }
212
213    fn contains_action(&self, hash: &ActionHash) -> StateQueryResult<bool> {
214        Ok(self.actions().any(|h| h.action_address() == hash))
215    }
216
217    fn get_action(&self, hash: &ActionHash) -> StateQueryResult<Option<SignedActionHashed>> {
218        Ok(self
219            .actions()
220            .find(|&h| h.action_address() == hash)
221            .cloned())
222    }
223
224    fn get_warrants_for_basis(
225        &self,
226        _hash: &AnyLinkableHash,
227        _check_validity: bool,
228    ) -> StateQueryResult<Vec<WarrantOp>> {
229        unimplemented!(
230            "Warrants are not committed to the chain, so the scratch will never contain one."
231        )
232    }
233
234    fn get_record(&self, hash: &AnyDhtHash) -> StateQueryResult<Option<Record>> {
235        match hash.clone().into_primitive() {
236            AnyDhtHashPrimitive::Entry(hash) => self.get_any_record(&hash),
237            AnyDhtHashPrimitive::Action(hash) => self.get_exact_record(&hash),
238        }
239    }
240
241    /// It doesn't make sense to search for
242    /// a different authored entry in a scratch
243    /// then the scratches author so this is
244    /// the same as `get_entry`.
245    fn get_public_or_authored_entry(
246        &self,
247        hash: &EntryHash,
248        _author: Option<&AgentPubKey>,
249    ) -> StateQueryResult<Option<Entry>> {
250        self.get_entry(hash)
251    }
252
253    /// It doesn't make sense to search for
254    /// a different authored record in a scratch
255    /// then the scratches author so this is
256    /// the same as `get_record`.
257    fn get_public_or_authored_record(
258        &self,
259        hash: &AnyDhtHash,
260        _author: Option<&AgentPubKey>,
261    ) -> StateQueryResult<Option<Record>> {
262        self.get_record(hash)
263    }
264}
265
266impl FilteredScratch {
267    pub fn drain(&mut self) -> impl Iterator<Item = SignedActionHashed> + '_ {
268        self.actions.drain(..)
269    }
270}
271
272impl<Q> Stores<Q> for Scratch
273where
274    Q: Query<Item = Judged<SignedActionHashed>>,
275{
276    type O = FilteredScratch;
277
278    fn get_initial_data(&self, query: Q) -> StateQueryResult<Self::O> {
279        Ok(self.as_filter(query.as_filter()))
280    }
281}
282
283impl StoresIter<Judged<SignedActionHashed>> for FilteredScratch {
284    fn iter(&mut self) -> StateQueryResult<StmtIter<'_, Judged<SignedActionHashed>>> {
285        // We are assuming data in the scratch space is valid even though
286        // it hasn't been validated yet because if it does fail validation
287        // then this transaction will be rolled back.
288        // TODO: Write test to prove this assumption.
289        Ok(Box::new(fallible_iterator::convert(
290            self.drain().map(Judged::valid).map(Ok),
291        )))
292    }
293}
294
295#[derive(Error, Debug)]
296pub enum ScratchError {
297    #[error(transparent)]
298    Timestamp(#[from] TimestampError),
299
300    #[error(transparent)]
301    Keystore(#[from] KeystoreError),
302
303    #[error(transparent)]
304    Action(#[from] ActionError),
305
306    /// Other
307    #[error("Other: {0}")]
308    Other(Box<dyn std::error::Error + Send + Sync>),
309}
310
311impl ScratchError {
312    /// promote a custom error type to a ScratchError
313    pub fn other(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
314        Self::Other(e.into())
315    }
316}
317
318impl From<one_err::OneErr> for ScratchError {
319    fn from(e: one_err::OneErr) -> Self {
320        Self::other(e)
321    }
322}
323
324#[derive(Error, Debug)]
325pub enum SyncScratchError {
326    #[error("Scratch lock was poisoned")]
327    ScratchLockPoison,
328}
329
330#[test]
331fn test_multiple_in_memory() {
332    use holochain_sqlite::rusqlite::*;
333
334    // blank string means "temporary database", which typically resides in
335    // memory but can be flushed to disk if sqlite is under memory pressure
336    let mut m1 = Connection::open("").unwrap();
337    let mut m2 = Connection::open("").unwrap();
338
339    let schema = "
340CREATE TABLE mytable (
341    x INTEGER PRIMARY KEY
342);
343    ";
344
345    m1.execute(schema, []).unwrap();
346    m2.execute(schema, []).unwrap();
347
348    let num = m1
349        .execute("INSERT INTO mytable (x) VALUES (1)", [])
350        .unwrap();
351    assert_eq!(num, 1);
352
353    let xs1: Vec<u16> = m1
354        .transaction()
355        .unwrap()
356        .prepare_cached("SELECT x FROM mytable")
357        .unwrap()
358        .query_map([], |row| row.get(0))
359        .unwrap()
360        .collect::<Result<Vec<_>, _>>()
361        .unwrap();
362
363    let xs2: Vec<u16> = m2
364        .transaction()
365        .unwrap()
366        .prepare_cached("SELECT * FROM mytable")
367        .unwrap()
368        .query_map([], |row| row.get(0))
369        .unwrap()
370        .collect::<Result<Vec<_>, _>>()
371        .unwrap();
372
373    assert_eq!(xs1, vec![1]);
374    assert!(xs2.is_empty());
375}