1#![allow(missing_docs)]
5
6use std::{
7 collections::BTreeSet,
8 fmt::Debug,
9 ops::{Deref, DerefMut},
10 sync::Arc,
11};
12
13use anyhow::{bail, Result};
14use futures_lite::future::Boxed as BoxedFuture;
15use futures_util::future::BoxFuture;
16use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::{
21 downloader::Downloader,
22 provider::EventSender,
23 store::GcConfig,
24 util::{
25 local_pool::{self, LocalPool, LocalPoolHandle},
26 SetTagOption,
27 },
28 BlobFormat, Hash,
29};
30
31pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
33
34#[derive(derive_more::Debug)]
36enum GcState {
37 Initial(#[debug(skip)] Vec<ProtectCb>),
39 Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
41}
42
43impl Default for GcState {
44 fn default() -> Self {
45 Self::Initial(Vec::new())
46 }
47}
48
49#[derive(Debug)]
50enum Rt {
51 Handle(LocalPoolHandle),
52 Owned(LocalPool),
53}
54
55impl Deref for Rt {
56 type Target = LocalPoolHandle;
57
58 fn deref(&self) -> &Self::Target {
59 match self {
60 Self::Handle(ref handle) => handle,
61 Self::Owned(ref pool) => pool.handle(),
62 }
63 }
64}
65
66#[derive(Debug)]
67pub(crate) struct BlobsInner<S> {
68 rt: Rt,
69 pub(crate) store: S,
70 events: EventSender,
71 pub(crate) downloader: Downloader,
72 pub(crate) endpoint: Endpoint,
73 gc_state: std::sync::Mutex<GcState>,
74 #[cfg(feature = "rpc")]
75 pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
76}
77
78impl<S> BlobsInner<S> {
79 pub(crate) fn rt(&self) -> &LocalPoolHandle {
80 &self.rt
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct Blobs<S> {
86 pub(crate) inner: Arc<BlobsInner<S>>,
87 #[cfg(feature = "rpc")]
88 pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
89}
90
91#[cfg(feature = "rpc")]
93#[derive(Debug, Default)]
94pub(crate) struct BlobBatches {
95 batches: std::collections::BTreeMap<BatchId, BlobBatch>,
97 max: u64,
99}
100
101#[cfg(feature = "rpc")]
103#[derive(Debug, Default)]
104struct BlobBatch {
105 tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
107}
108
109#[cfg(feature = "rpc")]
110impl BlobBatches {
111 pub fn create(&mut self) -> BatchId {
113 let id = self.max;
114 self.max += 1;
115 BatchId(id)
116 }
117
118 pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
120 let entry = self.batches.entry(batch).or_default();
121 entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
122 }
123
124 pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
126 if let Some(batch) = self.batches.get_mut(&batch) {
127 if let Some(tags) = batch.tags.get_mut(content) {
128 tags.pop();
129 if tags.is_empty() {
130 batch.tags.remove(content);
131 }
132 return Ok(());
133 }
134 }
135 anyhow::bail!("tag not found in batch");
137 }
138
139 pub fn remove(&mut self, batch: BatchId) {
141 self.batches.remove(&batch);
142 }
143}
144
145#[derive(Debug)]
147pub struct Builder<S> {
148 store: S,
149 events: Option<EventSender>,
150 rt: Option<LocalPoolHandle>,
151}
152
153impl<S: crate::store::Store> Builder<S> {
154 pub fn events(mut self, value: EventSender) -> Self {
156 self.events = Some(value);
157 self
158 }
159
160 pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
162 self.rt = Some(rt);
163 self
164 }
165
166 pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
169 let rt = self
170 .rt
171 .map(Rt::Handle)
172 .unwrap_or_else(|| Rt::Owned(LocalPool::default()));
173 let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
174 Blobs::new(
175 self.store,
176 rt,
177 self.events.unwrap_or_default(),
178 downloader,
179 endpoint.clone(),
180 )
181 }
182}
183
184impl<S> Blobs<S> {
185 pub fn builder(store: S) -> Builder<S> {
187 Builder {
188 store,
189 events: None,
190 rt: None,
191 }
192 }
193}
194
195impl Blobs<crate::store::mem::Store> {
196 pub fn memory() -> Builder<crate::store::mem::Store> {
198 Self::builder(crate::store::mem::Store::new())
199 }
200}
201
202impl Blobs<crate::store::fs::Store> {
203 pub async fn persistent(
205 path: impl AsRef<std::path::Path>,
206 ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
207 Ok(Self::builder(crate::store::fs::Store::load(path).await?))
208 }
209}
210
211impl<S: crate::store::Store> Blobs<S> {
212 fn new(
213 store: S,
214 rt: Rt,
215 events: EventSender,
216 downloader: Downloader,
217 endpoint: Endpoint,
218 ) -> Self {
219 Self {
220 inner: Arc::new(BlobsInner {
221 rt,
222 store,
223 events,
224 downloader,
225 endpoint,
226 #[cfg(feature = "rpc")]
227 batches: Default::default(),
228 gc_state: Default::default(),
229 }),
230 #[cfg(feature = "rpc")]
231 rpc_handler: Default::default(),
232 }
233 }
234
235 pub fn store(&self) -> &S {
236 &self.inner.store
237 }
238
239 pub fn events(&self) -> &EventSender {
240 &self.inner.events
241 }
242
243 pub fn rt(&self) -> &LocalPoolHandle {
244 self.inner.rt()
245 }
246
247 pub fn downloader(&self) -> &Downloader {
248 &self.inner.downloader
249 }
250
251 pub fn endpoint(&self) -> &Endpoint {
252 &self.inner.endpoint
253 }
254
255 pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
259 let mut state = self.inner.gc_state.lock().unwrap();
260 match &mut *state {
261 GcState::Initial(cbs) => {
262 cbs.push(cb);
263 }
264 GcState::Started(_) => {
265 anyhow::bail!("cannot add protected blobs after gc has started");
266 }
267 }
268 Ok(())
269 }
270
271 pub fn start_gc(&self, config: GcConfig) -> Result<()> {
273 let mut state = self.inner.gc_state.lock().unwrap();
274 let protected = match state.deref_mut() {
275 GcState::Initial(items) => std::mem::take(items),
276 GcState::Started(_) => bail!("gc already started"),
277 };
278 let protected = Arc::new(protected);
279 let protected_cb = move || {
280 let protected = protected.clone();
281 async move {
282 let mut set = BTreeSet::new();
283 for cb in protected.iter() {
284 cb(&mut set).await;
285 }
286 set
287 }
288 };
289 let store = self.store().clone();
290 let run = self
291 .rt()
292 .spawn(move || async move { store.gc_run(config, protected_cb).await });
293 *state = GcState::Started(Some(run));
294 Ok(())
295 }
296}
297
298impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
299 fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
300 let db = self.store().clone();
301 let events = self.events().clone();
302 let rt = self.rt().clone();
303
304 Box::pin(async move {
305 crate::provider::handle_connection(conn.await?, db, events, rt).await;
306 Ok(())
307 })
308 }
309
310 fn shutdown(&self) -> BoxedFuture<()> {
311 let store = self.store().clone();
312 Box::pin(async move {
313 store.shutdown().await;
314 })
315 }
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct BlobDownloadRequest {
321 pub hash: Hash,
323 pub format: BlobFormat,
326 pub nodes: Vec<NodeAddr>,
333 pub tag: SetTagOption,
335 pub mode: DownloadMode,
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize)]
341pub enum DownloadMode {
342 Direct,
347 Queued,
351}
352
353#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
355pub struct BatchId(pub u64);