1use crate::{
2 cache::{BlockInfo, CacheTracker, WriteInfo},
3 cidbytes::CidBytes,
4 db::*,
5 Block, BlockStore, Result, StoreStats, TempPin,
6};
7use fnv::FnvHashSet;
8use libipld::{cid, codec::References, store::StoreParams, Cid, Ipld};
9use parking_lot::Mutex;
10use std::{
11 borrow::Cow, collections::HashSet, convert::TryFrom, iter::FromIterator, marker::PhantomData,
12 mem, sync::Arc,
13};
14
15pub struct Transaction<'a, S> {
16 inner: &'a mut rusqlite::Connection,
17 info: TransactionInfo,
18 expired_temp_pins: Arc<Mutex<Vec<i64>>>,
19 _s: PhantomData<S>,
20}
21
22struct TransactionInfo {
23 written: Vec<WriteInfo>,
24 accessed: Vec<BlockInfo>,
25 committed: bool,
26 tracker: Arc<dyn CacheTracker>,
27}
28
29impl Drop for TransactionInfo {
30 fn drop(&mut self) {
31 if !self.accessed.is_empty() {
32 let blocks = mem::take(&mut self.accessed);
33 self.tracker.blocks_accessed(blocks);
34 }
35 if self.committed && !self.written.is_empty() {
37 let blocks = mem::take(&mut self.written);
38 self.tracker.blocks_written(blocks);
39 }
40 }
41}
42
43impl<'a, S> Transaction<'a, S>
44where
45 S: StoreParams,
46 Ipld: References<S::Codecs>,
47{
48 pub(crate) fn new(owner: &'a mut BlockStore<S>) -> Self {
49 Self {
50 inner: &mut owner.conn,
51 info: TransactionInfo {
52 written: Vec::new(),
53 accessed: Vec::new(),
54 committed: false,
55 tracker: owner.config.cache_tracker.clone(),
56 },
57 expired_temp_pins: owner.expired_temp_pins.clone(),
58 _s: PhantomData,
59 }
60 }
61
62 pub fn alias<'b>(
64 &mut self,
65 name: impl Into<Cow<'b, [u8]>>,
66 link: Option<&'b Cid>,
67 ) -> Result<()> {
68 let link: Option<CidBytes> = link.map(CidBytes::try_from).transpose()?;
69 let name = name.into().into_owned();
70 in_txn(self.inner, None, true, move |txn| {
71 alias(txn, name.as_ref(), link.as_ref())
72 })?;
73 Ok(())
74 }
75
76 pub fn reverse_alias(&mut self, cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>> {
78 let cid = CidBytes::try_from(cid)?;
79 in_txn(self.inner, None, true, move |txn| {
80 reverse_alias(txn, cid.as_ref())
81 })
82 }
83
84 pub fn resolve<'b>(&mut self, name: impl Into<Cow<'b, [u8]>>) -> Result<Option<Cid>> {
86 let name = name.into().into_owned();
87 in_txn(self.inner, None, true, move |txn| {
88 resolve::<CidBytes>(txn, name.as_ref())?
89 .map(|c| Cid::try_from(&c))
90 .transpose()
91 .map_err(Into::into)
92 })
93 }
94
95 pub fn temp_pin(&mut self) -> TempPin {
97 TempPin::new(self.expired_temp_pins.clone())
98 }
99
100 pub fn extend_temp_pin(&mut self, pin: &mut TempPin, link: &Cid) -> Result<()> {
102 let link = CidBytes::try_from(link)?;
103 let id = pin.id;
104 pin.id = in_txn(self.inner, None, true, move |txn| {
105 extend_temp_pin(txn, id, vec![link])
106 })?;
107 Ok(())
108 }
109
110 pub fn has_cid(&mut self, cid: &Cid) -> Result<bool> {
114 let cid = CidBytes::try_from(cid)?;
115 in_txn(self.inner, None, false, move |txn| has_cid(txn, cid))
116 }
117
118 pub fn has_block(&mut self, cid: &Cid) -> Result<bool> {
120 let cid = CidBytes::try_from(cid)?;
121 in_txn(self.inner, None, false, move |txn| has_block(txn, cid))
122 }
123
124 pub fn get_known_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> {
126 let res = in_txn(self.inner, None, false, move |txn| {
127 get_known_cids::<CidBytes>(txn)
128 })?;
129 let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
130 Ok(res)
131 }
132
133 pub fn get_block_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> {
135 let res = in_txn(self.inner, None, false, move |txn| {
136 get_block_cids::<CidBytes>(txn)
137 })?;
138 let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
139 Ok(res)
140 }
141
142 pub fn get_descendants<C: FromIterator<Cid>>(&mut self, cid: &Cid) -> Result<C> {
144 let cid = CidBytes::try_from(cid)?;
145 let res = in_txn(self.inner, None, false, move |txn| {
146 get_descendants(txn, cid)
147 })?;
148 let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?;
149 Ok(res)
150 }
151
152 pub fn get_missing_blocks<C: FromIterator<Cid>>(&mut self, cid: &Cid) -> Result<C> {
154 let cid = CidBytes::try_from(cid)?;
155 let result = in_txn(self.inner, None, false, move |txn| {
156 get_missing_blocks(txn, cid)
157 })?;
158 let res = result
159 .iter()
160 .map(Cid::try_from)
161 .collect::<cid::Result<C>>()?;
162 Ok(res)
163 }
164
165 pub fn aliases<C: FromIterator<(Vec<u8>, Cid)>>(&mut self) -> Result<C> {
167 let result: Vec<(Vec<u8>, CidBytes)> =
168 in_txn(self.inner, None, false, move |txn| aliases(txn))?;
169 let res = result
170 .into_iter()
171 .map(|(alias, cid)| {
172 let cid = Cid::try_from(&cid)?;
173 Ok((alias, cid))
174 })
175 .collect::<cid::Result<C>>()?;
176 Ok(res)
177 }
178
179 pub fn put_block(&mut self, block: Block<S>, pin: Option<&mut TempPin>) -> Result<()> {
181 let cid_bytes = CidBytes::try_from(block.cid())?;
182 let mut links = Vec::new();
183 block.references(&mut links)?;
184 let links = links
185 .iter()
186 .map(CidBytes::try_from)
187 .collect::<std::result::Result<FnvHashSet<_>, cid::Error>>()?;
188 let id = pin.as_ref().map(|p| p.id);
189 let cid = *block.cid();
190 let len = block.data().len();
191 let (opt_id, res) = in_txn(self.inner, None, true, move |txn| {
192 put_block(txn, &cid_bytes, block.data(), links.iter().copied(), id)
193 })?;
194 if let (Some(id), Some(pin)) = (opt_id, pin) {
195 pin.id = id;
196 }
197 let write_info = WriteInfo::new(BlockInfo::new(res.id, &cid, len), res.block_exists);
198 self.info.written.push(write_info);
199 Ok(())
200 }
201
202 pub fn get_block(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> {
204 let cid1 = *cid;
205 let response = in_txn(self.inner, None, false, move |txn| {
206 get_block(txn, &CidBytes::try_from(&cid1)?)
207 })?;
208 if let Some(info) = response
209 .as_ref()
210 .map(|(id, data)| BlockInfo::new(*id, cid, data.len()))
211 {
212 self.info.accessed.push(info);
213 }
214 Ok(response.map(|(_id, data)| data))
215 }
216
217 pub fn get_store_stats(&mut self) -> Result<StoreStats> {
221 in_txn(self.inner, None, false, get_store_stats)
222 }
223
224 pub fn commit(mut self) -> Result<()> {
226 self.info.committed = true;
227 Ok(())
228 }
229}