iroh_blobs/util/
temp_tag.rs

1#![allow(dead_code)]
2use std::{
3    collections::HashMap,
4    sync::{Arc, Mutex, Weak},
5};
6
7use serde::{Deserialize, Serialize};
8use tracing::{trace, warn};
9
10use crate::{api::proto::Scope, BlobFormat, Hash, HashAndFormat};
11
12/// An ephemeral, in-memory tag that protects content while the process is running.
13///
14/// If format is raw, this will protect just the blob
15/// If format is collection, this will protect the collection and all blobs in it
16#[derive(Debug, Serialize, Deserialize)]
17#[must_use = "TempTag is a temporary tag that should be used to protect content while the process is running. \
18       If you want to keep the content alive, use TempTag::leak()"]
19pub struct TempTag {
20    /// The hash and format we are pinning
21    inner: HashAndFormat,
22    /// optional callback to call on drop
23    #[serde(skip)]
24    on_drop: Option<Weak<dyn TagDrop>>,
25}
26
27impl AsRef<Hash> for TempTag {
28    fn as_ref(&self) -> &Hash {
29        &self.inner.hash
30    }
31}
32
33/// A trait for things that can track liveness of blobs and collections.
34///
35/// This trait works together with [TempTag] to keep track of the liveness of a
36/// blob or collection.
37///
38/// It is important to include the format in the liveness tracking, since
39/// protecting a collection means protecting the blob and all its children,
40/// whereas protecting a raw blob only protects the blob itself.
41pub trait TagCounter: TagDrop + Sized {
42    /// Called on creation of a temp tag
43    fn on_create(&self, inner: &HashAndFormat);
44
45    /// Get this as a weak reference for use in temp tags
46    fn as_weak(self: &Arc<Self>) -> Weak<dyn TagDrop> {
47        let on_drop: Arc<dyn TagDrop> = self.clone();
48        Arc::downgrade(&on_drop)
49    }
50
51    /// Create a new temp tag for the given hash and format
52    fn temp_tag(self: &Arc<Self>, inner: HashAndFormat) -> TempTag {
53        self.on_create(&inner);
54        TempTag::new(inner, Some(self.as_weak()))
55    }
56}
57
58/// Trait used from temp tags to notify an abstract store that a temp tag is
59/// being dropped.
60pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static {
61    /// Called on drop
62    fn on_drop(&self, inner: &HashAndFormat);
63}
64
65impl From<&TempTag> for HashAndFormat {
66    fn from(val: &TempTag) -> Self {
67        val.inner
68    }
69}
70
71impl From<TempTag> for HashAndFormat {
72    fn from(val: TempTag) -> Self {
73        val.inner
74    }
75}
76
77impl TempTag {
78    /// Create a new temp tag for the given hash and format
79    ///
80    /// This should only be used by store implementations.
81    ///
82    /// The caller is responsible for increasing the refcount on creation and to
83    /// make sure that temp tags that are created between a mark phase and a sweep
84    /// phase are protected.
85    pub fn new(inner: HashAndFormat, on_drop: Option<Weak<dyn TagDrop>>) -> Self {
86        Self { inner, on_drop }
87    }
88
89    /// The empty temp tag. We don't track the empty blob since we always have it.
90    pub fn leaking_empty(format: BlobFormat) -> Self {
91        Self {
92            inner: HashAndFormat {
93                hash: Hash::EMPTY,
94                format,
95            },
96            on_drop: None,
97        }
98    }
99
100    /// The hash of the pinned item
101    pub fn inner(&self) -> &HashAndFormat {
102        &self.inner
103    }
104
105    /// The hash of the pinned item
106    pub fn hash(&self) -> &Hash {
107        &self.inner.hash
108    }
109
110    /// The format of the pinned item
111    pub fn format(&self) -> BlobFormat {
112        self.inner.format
113    }
114
115    /// The hash and format of the pinned item
116    pub fn hash_and_format(&self) -> &HashAndFormat {
117        &self.inner
118    }
119
120    /// Keep the item alive until the end of the process
121    pub fn leak(&mut self) {
122        // set the liveness tracker to None, so that the refcount is not decreased
123        // during drop. This means that the refcount will never reach 0 and the
124        // item will not be gced until the end of the process.
125        self.on_drop = None;
126    }
127}
128
129impl Drop for TempTag {
130    fn drop(&mut self) {
131        if let Some(on_drop) = self.on_drop.take() {
132            if let Some(on_drop) = on_drop.upgrade() {
133                on_drop.on_drop(&self.inner);
134            }
135        }
136    }
137}
138
139#[derive(Debug, Default, Clone)]
140struct TempCounters {
141    /// number of raw temp tags for a hash
142    raw: u64,
143    /// number of hash seq temp tags for a hash
144    hash_seq: u64,
145}
146
147impl TempCounters {
148    fn counter(&self, format: BlobFormat) -> u64 {
149        match format {
150            BlobFormat::Raw => self.raw,
151            BlobFormat::HashSeq => self.hash_seq,
152        }
153    }
154
155    fn counter_mut(&mut self, format: BlobFormat) -> &mut u64 {
156        match format {
157            BlobFormat::Raw => &mut self.raw,
158            BlobFormat::HashSeq => &mut self.hash_seq,
159        }
160    }
161
162    fn inc(&mut self, format: BlobFormat) {
163        let counter = self.counter_mut(format);
164        *counter = counter.checked_add(1).unwrap();
165    }
166
167    fn dec(&mut self, format: BlobFormat) {
168        let counter = self.counter_mut(format);
169        *counter = counter.saturating_sub(1);
170    }
171
172    fn is_empty(&self) -> bool {
173        self.raw == 0 && self.hash_seq == 0
174    }
175}
176
177#[derive(Debug, Default)]
178pub(crate) struct TempTags {
179    scopes: HashMap<Scope, Arc<TempTagScope>>,
180    next_scope: u64,
181}
182
183impl TempTags {
184    pub fn create_scope(&mut self) -> (Scope, Arc<TempTagScope>) {
185        self.next_scope += 1;
186        let id = Scope(self.next_scope);
187        let scope = self.scopes.entry(id).or_default();
188        (id, scope.clone())
189    }
190
191    pub fn end_scope(&mut self, scope: Scope) {
192        self.scopes.remove(&scope);
193    }
194
195    pub fn list(&self) -> Vec<HashAndFormat> {
196        self.scopes
197            .values()
198            .flat_map(|scope| scope.list())
199            .collect()
200    }
201
202    pub fn create(&mut self, scope: Scope, content: HashAndFormat) -> TempTag {
203        let scope = self.scopes.entry(scope).or_default();
204
205        scope.temp_tag(content)
206    }
207
208    pub fn contains(&self, hash: Hash) -> bool {
209        self.scopes
210            .values()
211            .any(|scope| scope.0.lock().unwrap().contains(&HashAndFormat::raw(hash)))
212    }
213}
214
215#[derive(Debug, Default)]
216pub(crate) struct TempTagScope(Mutex<TempCounterMap>);
217
218impl TempTagScope {
219    pub fn list(&self) -> impl Iterator<Item = HashAndFormat> + 'static {
220        let guard = self.0.lock().unwrap();
221        let res = guard.keys();
222        drop(guard);
223        res.into_iter()
224    }
225}
226
227impl TagDrop for TempTagScope {
228    fn on_drop(&self, inner: &HashAndFormat) {
229        trace!("Dropping temp tag {:?}", inner);
230        self.0.lock().unwrap().dec(inner);
231    }
232}
233
234impl TagCounter for TempTagScope {
235    fn on_create(&self, inner: &HashAndFormat) {
236        trace!("Creating temp tag {:?}", inner);
237        self.0.lock().unwrap().inc(*inner);
238    }
239}
240
241#[derive(Debug, Clone, Default)]
242pub(crate) struct TempCounterMap(HashMap<Hash, TempCounters>);
243
244impl TempCounterMap {
245    pub fn inc(&mut self, value: HashAndFormat) {
246        self.0.entry(value.hash).or_default().inc(value.format)
247    }
248
249    pub fn dec(&mut self, value: &HashAndFormat) {
250        let HashAndFormat { hash, format } = value;
251        let Some(counters) = self.0.get_mut(hash) else {
252            warn!("Decrementing non-existent temp tag");
253            return;
254        };
255        counters.dec(*format);
256        if counters.is_empty() {
257            self.0.remove(hash);
258        }
259    }
260
261    pub fn contains(&self, haf: &HashAndFormat) -> bool {
262        let Some(entry) = self.0.get(&haf.hash) else {
263            return false;
264        };
265        entry.counter(haf.format) > 0
266    }
267
268    pub fn keys(&self) -> Vec<HashAndFormat> {
269        let mut res = Vec::new();
270        for (k, v) in self.0.iter() {
271            if v.raw > 0 {
272                res.push(HashAndFormat::raw(*k));
273            }
274            if v.hash_seq > 0 {
275                res.push(HashAndFormat::hash_seq(*k));
276            }
277        }
278        res
279    }
280}