1#![allow(missing_docs)]
5
6use std::{
7 collections::BTreeSet,
8 fmt::Debug,
9 ops::{Deref, DerefMut},
10 sync::Arc,
11};
12
13use anyhow::{bail, Result};
14use futures_lite::future::Boxed as BoxedFuture;
15use futures_util::future::BoxFuture;
16use iroh::{endpoint::Connection, protocol::ProtocolHandler, Endpoint, NodeAddr};
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::{
21 downloader::{self, ConcurrencyLimits, Downloader, RetryConfig},
22 metrics::Metrics,
23 provider::EventSender,
24 store::GcConfig,
25 util::{
26 local_pool::{self, LocalPool, LocalPoolHandle},
27 SetTagOption,
28 },
29 BlobFormat, Hash,
30};
31
32pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
34
35#[derive(derive_more::Debug)]
37enum GcState {
38 Initial(#[debug(skip)] Vec<ProtectCb>),
40 Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
42}
43
44impl Default for GcState {
45 fn default() -> Self {
46 Self::Initial(Vec::new())
47 }
48}
49
50#[derive(Debug)]
51enum Rt {
52 Handle(LocalPoolHandle),
53 Owned(LocalPool),
54}
55
56impl Deref for Rt {
57 type Target = LocalPoolHandle;
58
59 fn deref(&self) -> &Self::Target {
60 match self {
61 Self::Handle(ref handle) => handle,
62 Self::Owned(ref pool) => pool.handle(),
63 }
64 }
65}
66
67#[derive(Debug)]
68pub(crate) struct BlobsInner<S> {
69 rt: Rt,
70 pub(crate) store: S,
71 events: EventSender,
72 pub(crate) downloader: Downloader,
73 pub(crate) endpoint: Endpoint,
74 gc_state: std::sync::Mutex<GcState>,
75 #[cfg(feature = "rpc")]
76 pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
77}
78
79impl<S> BlobsInner<S> {
80 pub(crate) fn rt(&self) -> &LocalPoolHandle {
81 &self.rt
82 }
83}
84
85#[derive(Debug, Clone)]
86pub struct Blobs<S> {
87 pub(crate) inner: Arc<BlobsInner<S>>,
88 #[cfg(feature = "rpc")]
89 pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
90}
91
92#[cfg(feature = "rpc")]
94#[derive(Debug, Default)]
95pub(crate) struct BlobBatches {
96 batches: std::collections::BTreeMap<BatchId, BlobBatch>,
98 max: u64,
100}
101
102#[cfg(feature = "rpc")]
104#[derive(Debug, Default)]
105struct BlobBatch {
106 tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
108}
109
110#[cfg(feature = "rpc")]
111impl BlobBatches {
112 pub fn create(&mut self) -> BatchId {
114 let id = self.max;
115 self.max += 1;
116 BatchId(id)
117 }
118
119 pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
121 let entry = self.batches.entry(batch).or_default();
122 entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
123 }
124
125 pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
127 if let Some(batch) = self.batches.get_mut(&batch) {
128 if let Some(tags) = batch.tags.get_mut(content) {
129 tags.pop();
130 if tags.is_empty() {
131 batch.tags.remove(content);
132 }
133 return Ok(());
134 }
135 }
136 anyhow::bail!("tag not found in batch");
138 }
139
140 pub fn remove(&mut self, batch: BatchId) {
142 self.batches.remove(&batch);
143 }
144}
145
146#[derive(Debug)]
148pub struct Builder<S> {
149 store: S,
150 events: Option<EventSender>,
151 downloader_config: Option<crate::downloader::Config>,
152 rt: Option<LocalPoolHandle>,
153}
154
155impl<S: crate::store::Store> Builder<S> {
156 pub fn events(mut self, value: EventSender) -> Self {
158 self.events = Some(value);
159 self
160 }
161
162 pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
164 self.rt = Some(rt);
165 self
166 }
167
168 pub fn downloader_config(mut self, downloader_config: downloader::Config) -> Self {
170 self.downloader_config = Some(downloader_config);
171 self
172 }
173
174 pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self {
176 let downloader_config = self.downloader_config.get_or_insert_with(Default::default);
177 downloader_config.concurrency = concurrency_limits;
178 self
179 }
180
181 pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
183 let downloader_config = self.downloader_config.get_or_insert_with(Default::default);
184 downloader_config.retry = retry_config;
185 self
186 }
187
188 pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
191 let rt = self
192 .rt
193 .map(Rt::Handle)
194 .unwrap_or_else(|| Rt::Owned(LocalPool::default()));
195 let downloader_config = self.downloader_config.unwrap_or_default();
196 let downloader = Downloader::with_config(
197 self.store.clone(),
198 endpoint.clone(),
199 rt.clone(),
200 downloader_config,
201 );
202 Blobs::new(
203 self.store,
204 rt,
205 self.events.unwrap_or_default(),
206 downloader,
207 endpoint.clone(),
208 )
209 }
210}
211
212impl<S> Blobs<S> {
213 pub fn builder(store: S) -> Builder<S> {
215 Builder {
216 store,
217 events: None,
218 downloader_config: None,
219 rt: None,
220 }
221 }
222}
223
224impl Blobs<crate::store::mem::Store> {
225 pub fn memory() -> Builder<crate::store::mem::Store> {
227 Self::builder(crate::store::mem::Store::new())
228 }
229}
230
231impl Blobs<crate::store::fs::Store> {
232 pub async fn persistent(
234 path: impl AsRef<std::path::Path>,
235 ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
236 Ok(Self::builder(crate::store::fs::Store::load(path).await?))
237 }
238}
239
240impl<S: crate::store::Store> Blobs<S> {
241 fn new(
242 store: S,
243 rt: Rt,
244 events: EventSender,
245 downloader: Downloader,
246 endpoint: Endpoint,
247 ) -> Self {
248 Self {
249 inner: Arc::new(BlobsInner {
250 rt,
251 store,
252 events,
253 downloader,
254 endpoint,
255 #[cfg(feature = "rpc")]
256 batches: Default::default(),
257 gc_state: Default::default(),
258 }),
259 #[cfg(feature = "rpc")]
260 rpc_handler: Default::default(),
261 }
262 }
263
264 pub fn store(&self) -> &S {
265 &self.inner.store
266 }
267
268 pub fn metrics(&self) -> &Arc<Metrics> {
269 self.downloader().metrics()
270 }
271
272 pub fn events(&self) -> &EventSender {
273 &self.inner.events
274 }
275
276 pub fn rt(&self) -> &LocalPoolHandle {
277 self.inner.rt()
278 }
279
280 pub fn downloader(&self) -> &Downloader {
281 &self.inner.downloader
282 }
283
284 pub fn endpoint(&self) -> &Endpoint {
285 &self.inner.endpoint
286 }
287
288 pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
292 let mut state = self.inner.gc_state.lock().unwrap();
293 match &mut *state {
294 GcState::Initial(cbs) => {
295 cbs.push(cb);
296 }
297 GcState::Started(_) => {
298 anyhow::bail!("cannot add protected blobs after gc has started");
299 }
300 }
301 Ok(())
302 }
303
304 pub fn start_gc(&self, config: GcConfig) -> Result<()> {
306 let mut state = self.inner.gc_state.lock().unwrap();
307 let protected = match state.deref_mut() {
308 GcState::Initial(items) => std::mem::take(items),
309 GcState::Started(_) => bail!("gc already started"),
310 };
311 let protected = Arc::new(protected);
312 let protected_cb = move || {
313 let protected = protected.clone();
314 async move {
315 let mut set = BTreeSet::new();
316 for cb in protected.iter() {
317 cb(&mut set).await;
318 }
319 set
320 }
321 };
322 let store = self.store().clone();
323 let run = self
324 .rt()
325 .spawn(move || async move { store.gc_run(config, protected_cb).await });
326 *state = GcState::Started(Some(run));
327 Ok(())
328 }
329}
330
331impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
332 fn accept(&self, conn: Connection) -> BoxedFuture<Result<()>> {
333 let db = self.store().clone();
334 let events = self.events().clone();
335 let rt = self.rt().clone();
336
337 Box::pin(async move {
338 crate::provider::handle_connection(conn, db, events, rt).await;
339 Ok(())
340 })
341 }
342
343 fn shutdown(&self) -> BoxedFuture<()> {
344 let store = self.store().clone();
345 Box::pin(async move {
346 store.shutdown().await;
347 })
348 }
349}
350
351#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct BlobDownloadRequest {
354 pub hash: Hash,
356 pub format: BlobFormat,
359 pub nodes: Vec<NodeAddr>,
366 pub tag: SetTagOption,
368 pub mode: DownloadMode,
370}
371
372#[derive(Debug, Clone, Serialize, Deserialize)]
374pub enum DownloadMode {
375 Direct,
380 Queued,
384}
385
386#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
388pub struct BatchId(pub u64);