ipfs_sqlite_block_store/
transaction.rs

1use crate::{
2    cache::{BlockInfo, CacheTracker, WriteInfo},
3    cidbytes::CidBytes,
4    db::*,
5    Block, BlockStore, Result, StoreStats, TempPin,
6};
7use fnv::FnvHashSet;
8use libipld::{cid, codec::References, store::StoreParams, Cid, Ipld};
9use parking_lot::Mutex;
10use std::{
11    borrow::Cow, collections::HashSet, convert::TryFrom, iter::FromIterator, marker::PhantomData,
12    mem, sync::Arc,
13};
14
15pub struct Transaction<'a, S> {
16    inner: &'a mut rusqlite::Connection,
17    info: TransactionInfo,
18    expired_temp_pins: Arc<Mutex<Vec<i64>>>,
19    _s: PhantomData<S>,
20}
21
22struct TransactionInfo {
23    written: Vec<WriteInfo>,
24    accessed: Vec<BlockInfo>,
25    committed: bool,
26    tracker: Arc<dyn CacheTracker>,
27}
28
29impl Drop for TransactionInfo {
30    fn drop(&mut self) {
31        if !self.accessed.is_empty() {
32            let blocks = mem::take(&mut self.accessed);
33            self.tracker.blocks_accessed(blocks);
34        }
35        // if the transaction was not committed, we don't report blocks written!
36        if self.committed && !self.written.is_empty() {
37            let blocks = mem::take(&mut self.written);
38            self.tracker.blocks_written(blocks);
39        }
40    }
41}
42
43impl<'a, S> Transaction<'a, S>
44where
45    S: StoreParams,
46    Ipld: References<S::Codecs>,
47{
48    pub(crate) fn new(owner: &'a mut BlockStore<S>) -> Self {
49        Self {
50            inner: &mut owner.conn,
51            info: TransactionInfo {
52                written: Vec::new(),
53                accessed: Vec::new(),
54                committed: false,
55                tracker: owner.config.cache_tracker.clone(),
56            },
57            expired_temp_pins: owner.expired_temp_pins.clone(),
58            _s: PhantomData,
59        }
60    }
61
62    /// Set or delete an alias
63    pub fn alias<'b>(
64        &mut self,
65        name: impl Into<Cow<'b, [u8]>>,
66        link: Option<&'b Cid>,
67    ) -> Result<()> {
68        let link: Option<CidBytes> = link.map(CidBytes::try_from).transpose()?;
69        let name = name.into().into_owned();
70        in_txn(self.inner, None, true, move |txn| {
71            alias(txn, name.as_ref(), link.as_ref())
72        })?;
73        Ok(())
74    }
75
76    /// Returns the aliases referencing a cid.
77    pub fn reverse_alias(&mut self, cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>> {
78        let cid = CidBytes::try_from(cid)?;
79        in_txn(self.inner, None, true, move |txn| {
80            reverse_alias(txn, cid.as_ref())
81        })
82    }
83
84    /// Resolves an alias to a cid.
85    pub fn resolve<'b>(&mut self, name: impl Into<Cow<'b, [u8]>>) -> Result<Option<Cid>> {
86        let name = name.into().into_owned();
87        in_txn(self.inner, None, true, move |txn| {
88            resolve::<CidBytes>(txn, name.as_ref())?
89                .map(|c| Cid::try_from(&c))
90                .transpose()
91                .map_err(Into::into)
92        })
93    }
94
95    /// Get a temporary pin for safely adding blocks to the store
96    pub fn temp_pin(&mut self) -> TempPin {
97        TempPin::new(self.expired_temp_pins.clone())
98    }
99
100    /// Extend temp pin with an additional cid
101    pub fn extend_temp_pin(&mut self, pin: &mut TempPin, link: &Cid) -> Result<()> {
102        let link = CidBytes::try_from(link)?;
103        let id = pin.id;
104        pin.id = in_txn(self.inner, None, true, move |txn| {
105            extend_temp_pin(txn, id, vec![link])
106        })?;
107        Ok(())
108    }
109
110    /// Checks if the store knows about the cid.
111    ///
112    /// Note that this does not necessarily mean that the store has the data for the cid.
113    pub fn has_cid(&mut self, cid: &Cid) -> Result<bool> {
114        let cid = CidBytes::try_from(cid)?;
115        in_txn(self.inner, None, false, move |txn| has_cid(txn, cid))
116    }
117
118    /// Checks if the store has the data for a cid
119    pub fn has_block(&mut self, cid: &Cid) -> Result<bool> {
120        let cid = CidBytes::try_from(cid)?;
121        in_txn(self.inner, None, false, move |txn| has_block(txn, cid))
122    }
123
124    /// Get all cids that the store knows about
125    pub fn get_known_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> {
126        let res = in_txn(self.inner, None, false, move |txn| {
127            get_known_cids::<CidBytes>(txn)
128        })?;
129        let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
130        Ok(res)
131    }
132
133    /// Get all cids for which the store has blocks
134    pub fn get_block_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> {
135        let res = in_txn(self.inner, None, false, move |txn| {
136            get_block_cids::<CidBytes>(txn)
137        })?;
138        let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
139        Ok(res)
140    }
141
142    /// Get descendants of a cid
143    pub fn get_descendants<C: FromIterator<Cid>>(&mut self, cid: &Cid) -> Result<C> {
144        let cid = CidBytes::try_from(cid)?;
145        let res = in_txn(self.inner, None, false, move |txn| {
146            get_descendants(txn, cid)
147        })?;
148        let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
149        Ok(res)
150    }
151
152    /// Given a root of a dag, gives all cids which we do not have data for.
153    pub fn get_missing_blocks<C: FromIterator<Cid>>(&mut self, cid: &Cid) -> Result<C> {
154        let cid = CidBytes::try_from(cid)?;
155        let result = in_txn(self.inner, None, false, move |txn| {
156            get_missing_blocks(txn, cid)
157        })?;
158        let res = result
159            .iter()
160            .map(Cid::try_from)
161            .collect::<cid::Result<C>>()?;
162        Ok(res)
163    }
164
165    /// list all aliases
166    pub fn aliases<C: FromIterator<(Vec<u8>, Cid)>>(&mut self) -> Result<C> {
167        let result: Vec<(Vec<u8>, CidBytes)> =
168            in_txn(self.inner, None, false, move |txn| aliases(txn))?;
169        let res = result
170            .into_iter()
171            .map(|(alias, cid)| {
172                let cid = Cid::try_from(&cid)?;
173                Ok((alias, cid))
174            })
175            .collect::<cid::Result<C>>()?;
176        Ok(res)
177    }
178
179    /// Put a block. This will only be completed once the transaction is successfully committed
180    pub fn put_block(&mut self, block: Block<S>, pin: Option<&mut TempPin>) -> Result<()> {
181        let cid_bytes = CidBytes::try_from(block.cid())?;
182        let mut links = Vec::new();
183        block.references(&mut links)?;
184        let links = links
185            .iter()
186            .map(CidBytes::try_from)
187            .collect::<std::result::Result<FnvHashSet<_>, cid::Error>>()?;
188        let id = pin.as_ref().map(|p| p.id);
189        let cid = *block.cid();
190        let len = block.data().len();
191        let (opt_id, res) = in_txn(self.inner, None, true, move |txn| {
192            put_block(txn, &cid_bytes, block.data(), links.iter().copied(), id)
193        })?;
194        if let (Some(id), Some(pin)) = (opt_id, pin) {
195            pin.id = id;
196        }
197        let write_info = WriteInfo::new(BlockInfo::new(res.id, &cid, len), res.block_exists);
198        self.info.written.push(write_info);
199        Ok(())
200    }
201
202    /// Get a block
203    pub fn get_block(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
204        let cid1 = *cid;
205        let response = in_txn(self.inner, None, false, move |txn| {
206            get_block(txn, &CidBytes::try_from(&cid1)?)
207        })?;
208        if let Some(info) = response
209            .as_ref()
210            .map(|(id, data)| BlockInfo::new(*id, cid, data.len()))
211        {
212            self.info.accessed.push(info);
213        }
214        Ok(response.map(|(_id, data)| data))
215    }
216
217    /// Get the stats for the store.
218    ///
219    /// The stats are kept up to date, so this is fast.
220    pub fn get_store_stats(&mut self) -> Result<StoreStats> {
221        in_txn(self.inner, None, false, get_store_stats)
222    }
223
224    /// Commit and consume the transaction. Default is to not commit.
225    pub fn commit(mut self) -> Result<()> {
226        self.info.committed = true;
227        Ok(())
228    }
229}