p2panda_blobs/
import.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::BTreeMap;
4use std::io;
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7
8use anyhow::Result;
9use bytes::Bytes;
10use futures_lite::StreamExt;
11use futures_util::Stream;
12use iroh_blobs::provider::AddProgress;
13use iroh_blobs::store::{ImportMode, ImportProgress, Store};
14use iroh_blobs::util::local_pool::LocalPoolHandle;
15use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
16use iroh_blobs::{BlobFormat, HashAndFormat};
17use p2panda_core::Hash;
18use serde::{Deserialize, Serialize};
19use serde_error::Error as RpcError;
20
21/// Status of a blob import attempt.
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub enum ImportBlobEvent {
24    Done(Hash),
25    Abort(RpcError),
26}
27
28pub(crate) async fn import_blob<S: Store>(
29    store: S,
30    pool_handle: LocalPoolHandle,
31    path: PathBuf,
32) -> impl Stream<Item = ImportBlobEvent> {
33    let (sender, receiver) = async_channel::bounded(32);
34
35    let sender = sender.clone();
36    pool_handle.spawn_detached(|| async move {
37        if let Err(err) = add_from_path(store, path, sender.clone()).await {
38            sender
39                .send(AddProgress::Abort(RpcError::new(&*err)))
40                .await
41                .ok();
42        }
43    });
44
45    receiver.filter_map(|event| {
46        match event {
47            AddProgress::AllDone { hash, .. } => {
48                Some(ImportBlobEvent::Done(Hash::from_bytes(*hash.as_bytes())))
49            }
50            // @TODO: Use own error type here
51            AddProgress::Abort(err) => Some(ImportBlobEvent::Abort(err)),
52            _ => {
53                // @TODO: Add more event types
54                None
55            }
56        }
57    })
58}
59
60pub(crate) async fn import_blob_from_stream<S, T>(
61    store: S,
62    pool_handle: LocalPoolHandle,
63    data: T,
64) -> impl Stream<Item = ImportBlobEvent>
65where
66    T: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
67    S: Store,
68{
69    let (sender, receiver) = async_channel::bounded(32);
70
71    let sender = sender.clone();
72    pool_handle.spawn_detached(|| async move {
73        if let Err(err) = add_from_stream(store, data, sender.clone()).await {
74            sender
75                .send(AddProgress::Abort(RpcError::new(&*err)))
76                .await
77                .ok();
78        }
79    });
80
81    receiver.filter_map(|event| {
82        match event {
83            AddProgress::AllDone { hash, .. } => {
84                Some(ImportBlobEvent::Done(Hash::from_bytes(*hash.as_bytes())))
85            }
86            // @TODO: Use own error type here
87            AddProgress::Abort(err) => Some(ImportBlobEvent::Abort(err)),
88            _ => {
89                // @TODO: Add more event types
90                None
91            }
92        }
93    })
94}
95
96async fn add_from_path<S: Store>(
97    store: S,
98    path: PathBuf,
99    progress: async_channel::Sender<AddProgress>,
100) -> Result<()> {
101    let progress = AsyncChannelProgressSender::new(progress);
102    let names = Arc::new(Mutex::new(BTreeMap::new()));
103
104    let import_progress = progress.clone().with_filter_map(move |x| match x {
105        ImportProgress::Found { id, name } => {
106            names.lock().unwrap().insert(id, name);
107            None
108        }
109        ImportProgress::Size { id, size } => {
110            let name = names.lock().unwrap().remove(&id)?;
111            Some(AddProgress::Found { id, name, size })
112        }
113        ImportProgress::OutboardProgress { id, offset } => {
114            Some(AddProgress::Progress { id, offset })
115        }
116        ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }),
117        _ => None,
118    });
119
120    let import_mode = ImportMode::default();
121    let (tag, _size) = store
122        .import_file(path, import_mode, BlobFormat::Raw, import_progress)
123        .await?;
124
125    let hash_and_format = tag.inner();
126    let HashAndFormat { hash, format } = *hash_and_format;
127    let tag = store.create_tag(*hash_and_format).await?;
128    progress
129        .send(AddProgress::AllDone { hash, format, tag })
130        .await?;
131
132    Ok(())
133}
134
135async fn add_from_stream<T, S>(
136    store: S,
137    data: T,
138    progress: async_channel::Sender<AddProgress>,
139) -> Result<()>
140where
141    T: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
142    S: Store,
143{
144    let progress = AsyncChannelProgressSender::new(progress);
145    let names = Arc::new(Mutex::new(BTreeMap::new()));
146
147    let import_progress = progress.clone().with_filter_map(move |x| match x {
148        ImportProgress::Found { id, name } => {
149            names.lock().unwrap().insert(id, name);
150            None
151        }
152        ImportProgress::Size { id, size } => {
153            let name = names.lock().unwrap().remove(&id)?;
154            Some(AddProgress::Found { id, name, size })
155        }
156        ImportProgress::OutboardProgress { id, offset } => {
157            Some(AddProgress::Progress { id, offset })
158        }
159        ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }),
160        ImportProgress::CopyProgress { id, offset } => Some(AddProgress::Progress { id, offset }),
161    });
162
163    let (tag, _size) = store
164        .import_stream(data, BlobFormat::Raw, import_progress)
165        .await?;
166
167    let hash_and_format = tag.inner();
168    let HashAndFormat { hash, format } = *hash_and_format;
169    let tag = store.create_tag(*hash_and_format).await?;
170    progress
171        .send(AddProgress::AllDone { hash, format, tag })
172        .await?;
173
174    Ok(())
175}