1use std::collections::BTreeMap;
4use std::io;
5use std::path::PathBuf;
6use std::sync::{Arc, Mutex};
7
8use anyhow::Result;
9use bytes::Bytes;
10use futures_lite::StreamExt;
11use futures_util::Stream;
12use iroh_blobs::provider::AddProgress;
13use iroh_blobs::store::{ImportMode, ImportProgress, Store};
14use iroh_blobs::util::local_pool::LocalPoolHandle;
15use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
16use iroh_blobs::{BlobFormat, HashAndFormat};
17use p2panda_core::Hash;
18use serde::{Deserialize, Serialize};
19use serde_error::Error as RpcError;
20
21#[derive(Clone, Debug, Serialize, Deserialize)]
23pub enum ImportBlobEvent {
24 Done(Hash),
25 Abort(RpcError),
26}
27
28pub(crate) async fn import_blob<S: Store>(
29 store: S,
30 pool_handle: LocalPoolHandle,
31 path: PathBuf,
32) -> impl Stream<Item = ImportBlobEvent> {
33 let (sender, receiver) = async_channel::bounded(32);
34
35 let sender = sender.clone();
36 pool_handle.spawn_detached(|| async move {
37 if let Err(err) = add_from_path(store, path, sender.clone()).await {
38 sender
39 .send(AddProgress::Abort(RpcError::new(&*err)))
40 .await
41 .ok();
42 }
43 });
44
45 receiver.filter_map(|event| {
46 match event {
47 AddProgress::AllDone { hash, .. } => {
48 Some(ImportBlobEvent::Done(Hash::from_bytes(*hash.as_bytes())))
49 }
50 AddProgress::Abort(err) => Some(ImportBlobEvent::Abort(err)),
52 _ => {
53 None
55 }
56 }
57 })
58}
59
60pub(crate) async fn import_blob_from_stream<S, T>(
61 store: S,
62 pool_handle: LocalPoolHandle,
63 data: T,
64) -> impl Stream<Item = ImportBlobEvent>
65where
66 T: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
67 S: Store,
68{
69 let (sender, receiver) = async_channel::bounded(32);
70
71 let sender = sender.clone();
72 pool_handle.spawn_detached(|| async move {
73 if let Err(err) = add_from_stream(store, data, sender.clone()).await {
74 sender
75 .send(AddProgress::Abort(RpcError::new(&*err)))
76 .await
77 .ok();
78 }
79 });
80
81 receiver.filter_map(|event| {
82 match event {
83 AddProgress::AllDone { hash, .. } => {
84 Some(ImportBlobEvent::Done(Hash::from_bytes(*hash.as_bytes())))
85 }
86 AddProgress::Abort(err) => Some(ImportBlobEvent::Abort(err)),
88 _ => {
89 None
91 }
92 }
93 })
94}
95
96async fn add_from_path<S: Store>(
97 store: S,
98 path: PathBuf,
99 progress: async_channel::Sender<AddProgress>,
100) -> Result<()> {
101 let progress = AsyncChannelProgressSender::new(progress);
102 let names = Arc::new(Mutex::new(BTreeMap::new()));
103
104 let import_progress = progress.clone().with_filter_map(move |x| match x {
105 ImportProgress::Found { id, name } => {
106 names.lock().unwrap().insert(id, name);
107 None
108 }
109 ImportProgress::Size { id, size } => {
110 let name = names.lock().unwrap().remove(&id)?;
111 Some(AddProgress::Found { id, name, size })
112 }
113 ImportProgress::OutboardProgress { id, offset } => {
114 Some(AddProgress::Progress { id, offset })
115 }
116 ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }),
117 _ => None,
118 });
119
120 let import_mode = ImportMode::default();
121 let (tag, _size) = store
122 .import_file(path, import_mode, BlobFormat::Raw, import_progress)
123 .await?;
124
125 let hash_and_format = tag.inner();
126 let HashAndFormat { hash, format } = *hash_and_format;
127 let tag = store.create_tag(*hash_and_format).await?;
128 progress
129 .send(AddProgress::AllDone { hash, format, tag })
130 .await?;
131
132 Ok(())
133}
134
135async fn add_from_stream<T, S>(
136 store: S,
137 data: T,
138 progress: async_channel::Sender<AddProgress>,
139) -> Result<()>
140where
141 T: Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
142 S: Store,
143{
144 let progress = AsyncChannelProgressSender::new(progress);
145 let names = Arc::new(Mutex::new(BTreeMap::new()));
146
147 let import_progress = progress.clone().with_filter_map(move |x| match x {
148 ImportProgress::Found { id, name } => {
149 names.lock().unwrap().insert(id, name);
150 None
151 }
152 ImportProgress::Size { id, size } => {
153 let name = names.lock().unwrap().remove(&id)?;
154 Some(AddProgress::Found { id, name, size })
155 }
156 ImportProgress::OutboardProgress { id, offset } => {
157 Some(AddProgress::Progress { id, offset })
158 }
159 ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }),
160 ImportProgress::CopyProgress { id, offset } => Some(AddProgress::Progress { id, offset }),
161 });
162
163 let (tag, _size) = store
164 .import_stream(data, BlobFormat::Raw, import_progress)
165 .await?;
166
167 let hash_and_format = tag.inner();
168 let HashAndFormat { hash, format } = *hash_and_format;
169 let tag = store.create_tag(*hash_and_format).await?;
170 progress
171 .send(AddProgress::AllDone { hash, format, tag })
172 .await?;
173
174 Ok(())
175}