1#![allow(missing_docs)]
5
6use std::{
7 collections::BTreeSet,
8 fmt::Debug,
9 ops::{Deref, DerefMut},
10 sync::Arc,
11};
12
13use anyhow::{bail, Result};
14use futures_lite::future::Boxed as BoxedFuture;
15use futures_util::future::BoxFuture;
16use iroh::{endpoint::Connection, protocol::ProtocolHandler, Endpoint, NodeAddr};
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::{
21 downloader::{ConcurrencyLimits, Downloader, RetryConfig},
22 provider::EventSender,
23 store::GcConfig,
24 util::{
25 local_pool::{self, LocalPool, LocalPoolHandle},
26 SetTagOption,
27 },
28 BlobFormat, Hash,
29};
30
31pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
33
34#[derive(derive_more::Debug)]
36enum GcState {
37 Initial(#[debug(skip)] Vec<ProtectCb>),
39 Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
41}
42
43impl Default for GcState {
44 fn default() -> Self {
45 Self::Initial(Vec::new())
46 }
47}
48
49#[derive(Debug)]
50enum Rt {
51 Handle(LocalPoolHandle),
52 Owned(LocalPool),
53}
54
55impl Deref for Rt {
56 type Target = LocalPoolHandle;
57
58 fn deref(&self) -> &Self::Target {
59 match self {
60 Self::Handle(ref handle) => handle,
61 Self::Owned(ref pool) => pool.handle(),
62 }
63 }
64}
65
66#[derive(Debug)]
67pub(crate) struct BlobsInner<S> {
68 rt: Rt,
69 pub(crate) store: S,
70 events: EventSender,
71 pub(crate) downloader: Downloader,
72 pub(crate) endpoint: Endpoint,
73 gc_state: std::sync::Mutex<GcState>,
74 #[cfg(feature = "rpc")]
75 pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
76}
77
78impl<S> BlobsInner<S> {
79 pub(crate) fn rt(&self) -> &LocalPoolHandle {
80 &self.rt
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct Blobs<S> {
86 pub(crate) inner: Arc<BlobsInner<S>>,
87 #[cfg(feature = "rpc")]
88 pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
89}
90
91#[cfg(feature = "rpc")]
93#[derive(Debug, Default)]
94pub(crate) struct BlobBatches {
95 batches: std::collections::BTreeMap<BatchId, BlobBatch>,
97 max: u64,
99}
100
101#[cfg(feature = "rpc")]
103#[derive(Debug, Default)]
104struct BlobBatch {
105 tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
107}
108
109#[cfg(feature = "rpc")]
110impl BlobBatches {
111 pub fn create(&mut self) -> BatchId {
113 let id = self.max;
114 self.max += 1;
115 BatchId(id)
116 }
117
118 pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
120 let entry = self.batches.entry(batch).or_default();
121 entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
122 }
123
124 pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
126 if let Some(batch) = self.batches.get_mut(&batch) {
127 if let Some(tags) = batch.tags.get_mut(content) {
128 tags.pop();
129 if tags.is_empty() {
130 batch.tags.remove(content);
131 }
132 return Ok(());
133 }
134 }
135 anyhow::bail!("tag not found in batch");
137 }
138
139 pub fn remove(&mut self, batch: BatchId) {
141 self.batches.remove(&batch);
142 }
143}
144
145#[derive(Debug)]
147pub struct Builder<S> {
148 store: S,
149 events: Option<EventSender>,
150 rt: Option<LocalPoolHandle>,
151 concurrency_limits: Option<ConcurrencyLimits>,
152 retry_config: Option<RetryConfig>,
153}
154
155impl<S: crate::store::Store> Builder<S> {
156 pub fn events(mut self, value: EventSender) -> Self {
158 self.events = Some(value);
159 self
160 }
161
162 pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
164 self.rt = Some(rt);
165 self
166 }
167
168 pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self {
170 self.concurrency_limits = Some(concurrency_limits);
171 self
172 }
173
174 pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
176 self.retry_config = Some(retry_config);
177 self
178 }
179
180 pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
183 let rt = self
184 .rt
185 .map(Rt::Handle)
186 .unwrap_or_else(|| Rt::Owned(LocalPool::default()));
187 let downloader = Downloader::with_config(
188 self.store.clone(),
189 endpoint.clone(),
190 rt.clone(),
191 self.concurrency_limits.unwrap_or_default(),
192 self.retry_config.unwrap_or_default(),
193 );
194 Blobs::new(
195 self.store,
196 rt,
197 self.events.unwrap_or_default(),
198 downloader,
199 endpoint.clone(),
200 )
201 }
202}
203
204impl<S> Blobs<S> {
205 pub fn builder(store: S) -> Builder<S> {
207 Builder {
208 store,
209 events: None,
210 rt: None,
211 concurrency_limits: None,
212 retry_config: None,
213 }
214 }
215}
216
217impl Blobs<crate::store::mem::Store> {
218 pub fn memory() -> Builder<crate::store::mem::Store> {
220 Self::builder(crate::store::mem::Store::new())
221 }
222}
223
224impl Blobs<crate::store::fs::Store> {
225 pub async fn persistent(
227 path: impl AsRef<std::path::Path>,
228 ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
229 Ok(Self::builder(crate::store::fs::Store::load(path).await?))
230 }
231}
232
233impl<S: crate::store::Store> Blobs<S> {
234 fn new(
235 store: S,
236 rt: Rt,
237 events: EventSender,
238 downloader: Downloader,
239 endpoint: Endpoint,
240 ) -> Self {
241 Self {
242 inner: Arc::new(BlobsInner {
243 rt,
244 store,
245 events,
246 downloader,
247 endpoint,
248 #[cfg(feature = "rpc")]
249 batches: Default::default(),
250 gc_state: Default::default(),
251 }),
252 #[cfg(feature = "rpc")]
253 rpc_handler: Default::default(),
254 }
255 }
256
257 pub fn store(&self) -> &S {
258 &self.inner.store
259 }
260
261 pub fn events(&self) -> &EventSender {
262 &self.inner.events
263 }
264
265 pub fn rt(&self) -> &LocalPoolHandle {
266 self.inner.rt()
267 }
268
269 pub fn downloader(&self) -> &Downloader {
270 &self.inner.downloader
271 }
272
273 pub fn endpoint(&self) -> &Endpoint {
274 &self.inner.endpoint
275 }
276
277 pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
281 let mut state = self.inner.gc_state.lock().unwrap();
282 match &mut *state {
283 GcState::Initial(cbs) => {
284 cbs.push(cb);
285 }
286 GcState::Started(_) => {
287 anyhow::bail!("cannot add protected blobs after gc has started");
288 }
289 }
290 Ok(())
291 }
292
293 pub fn start_gc(&self, config: GcConfig) -> Result<()> {
295 let mut state = self.inner.gc_state.lock().unwrap();
296 let protected = match state.deref_mut() {
297 GcState::Initial(items) => std::mem::take(items),
298 GcState::Started(_) => bail!("gc already started"),
299 };
300 let protected = Arc::new(protected);
301 let protected_cb = move || {
302 let protected = protected.clone();
303 async move {
304 let mut set = BTreeSet::new();
305 for cb in protected.iter() {
306 cb(&mut set).await;
307 }
308 set
309 }
310 };
311 let store = self.store().clone();
312 let run = self
313 .rt()
314 .spawn(move || async move { store.gc_run(config, protected_cb).await });
315 *state = GcState::Started(Some(run));
316 Ok(())
317 }
318}
319
320impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
321 fn accept(&self, conn: Connection) -> BoxedFuture<Result<()>> {
322 let db = self.store().clone();
323 let events = self.events().clone();
324 let rt = self.rt().clone();
325
326 Box::pin(async move {
327 crate::provider::handle_connection(conn, db, events, rt).await;
328 Ok(())
329 })
330 }
331
332 fn shutdown(&self) -> BoxedFuture<()> {
333 let store = self.store().clone();
334 Box::pin(async move {
335 store.shutdown().await;
336 })
337 }
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct BlobDownloadRequest {
343 pub hash: Hash,
345 pub format: BlobFormat,
348 pub nodes: Vec<NodeAddr>,
355 pub tag: SetTagOption,
357 pub mode: DownloadMode,
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
363pub enum DownloadMode {
364 Direct,
369 Queued,
373}
374
375#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
377pub struct BatchId(pub u64);