1use std::{
2 io::{Cursor, Read, Write},
3 sync::Arc,
4};
5
6use borsh::{BorshDeserialize, BorshSerialize};
7use futures::TryStreamExt;
8use iroh::{
9 base::node_addr::AddrInfoOptions,
10 docs::{store::Query, Author, AuthorId, Capability, NamespaceSecret},
11};
12use once_cell::sync::Lazy;
13
14use crate::{
15 store::LeafStore,
16 types::{EntityPath, NamespaceSecretKey, PathSegment, SubspaceId},
17 Digest, ExactLink,
18};
19
20pub type LeafIroh = crate::Leaf<LeafIrohStore>;
21
22pub const LEAF_GC_PREFIX_STR: &str = "_leaf_gc_";
23pub static LEAF_GC_PREFIX: Lazy<PathSegment> =
24 Lazy::new(|| PathSegment::String(LEAF_GC_PREFIX_STR.into()));
25
26#[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
27pub struct LeafGcPathPrefix {
28 pub leaf_gc_prefix_str: String,
29 pub subspace: [u8; 32],
30 pub entity_path: Vec<PathSegment>,
31 pub entity_snapshot_id: Digest,
32}
33
34impl LeafGcPathPrefix {
35 pub fn new(link: &ExactLink, entity_snapshot_id: Digest) -> Self {
36 LeafGcPathPrefix {
37 leaf_gc_prefix_str: LEAF_GC_PREFIX_STR.into(),
38 subspace: link.subspace,
39 entity_path: link.path.0.clone(),
40 entity_snapshot_id,
41 }
42 }
43 pub fn to_bytes(&self) -> Vec<u8> {
44 let mut buf = Vec::new();
45 self.serialize(&mut buf).unwrap();
46 buf
47 }
48}
49
50#[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
51pub struct LeafGcPath {
52 pub prefix: LeafGcPathPrefix,
53 pub entity_snapshot_id_plus_blob_hash: Digest,
54}
55
56impl std::fmt::Debug for LeafGcPath {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 write!(f, "LeafGCPath(")?;
59 f.debug_list()
60 .entry(&self.prefix.leaf_gc_prefix_str)
61 .entry(&iroh::base::base32::fmt(self.prefix.subspace))
62 .entries(&self.prefix.entity_path)
63 .entry(&self.prefix.entity_snapshot_id)
64 .entry(&self.entity_snapshot_id_plus_blob_hash)
65 .finish()?;
66 write!(f, ")")
67 }
68}
69
70impl LeafGcPath {
71 pub fn new(link: &ExactLink, entity_snapshot_id: Digest, blob_hash: Digest) -> Self {
72 let entity_snapshot_id_plus_blob_hash =
73 Digest::new(&[*entity_snapshot_id.as_bytes(), *blob_hash.as_bytes()].concat());
74 Self {
75 prefix: LeafGcPathPrefix::new(link, entity_snapshot_id),
76 entity_snapshot_id_plus_blob_hash,
77 }
78 }
79 pub fn to_bytes(&self) -> Vec<u8> {
80 let mut buf = Vec::new();
81 self.serialize(&mut buf).unwrap();
82 buf
83 }
84 pub fn from_bytes(mut bytes: &[u8]) -> std::io::Result<Self> {
85 Self::deserialize(&mut bytes)
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct LeafIrohStore {
91 pub client: iroh::client::Iroh,
92 pub docs: Arc<quick_cache::sync::Cache<iroh::docs::NamespaceId, iroh::client::Doc>>,
93}
94pub struct IrohDocumentKeyFormat {
95 pub path: Vec<PathSegment>,
96}
97impl IrohDocumentKeyFormat {
98 pub fn new(path: Vec<PathSegment>) -> Self {
99 Self { path }
100 }
101
102 pub fn to_bytes(&self) -> Vec<u8> {
103 let mut buf = Vec::new();
104 let mut segment_bytes = Vec::new();
105 for segment in &self.path {
106 segment.serialize(&mut segment_bytes).unwrap();
107 let len: u32 = segment_bytes.len().try_into().unwrap();
108 buf.write_all(&len.to_le_bytes()[..]).unwrap();
109 buf.write_all(&segment_bytes).unwrap();
110 segment_bytes.clear();
111 }
112 buf.push(0); buf
114 }
115
116 pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
117 let last_byte_idx = bytes.len() - 1;
118 if bytes[last_byte_idx] != 0 {
119 anyhow::bail!("Expected null terminating byte.")
120 }
121 let bytes = &bytes[0..last_byte_idx];
122 let len = bytes.len();
123 if len == 0 {
124 return Ok(Self { path: Vec::new() });
125 }
126 let mut reader = Cursor::new(bytes);
127 let mut path = Vec::new();
128 let mut segment_bytes = Vec::new();
129 loop {
130 if reader.position() as usize == len {
131 break;
132 }
133 let mut segment_len_bytes = [0u8; 4];
134 reader.read_exact(&mut segment_len_bytes)?;
135 let segment_len = u32::from_le_bytes(segment_len_bytes);
136
137 segment_bytes.extend(std::iter::repeat(0u8).take(segment_len as _));
138 reader.read_exact(&mut segment_bytes)?;
139
140 path.push(PathSegment::deserialize(&mut &segment_bytes[..])?);
141 segment_bytes.clear();
142 }
143 Ok(Self { path })
144 }
145}
146
147impl LeafIrohStore {
148 pub fn new(client: iroh::client::Iroh) -> Self {
149 Self {
150 client,
151 docs: Arc::new(quick_cache::sync::Cache::new(10)),
152 }
153 }
154
155 pub async fn open(&self, ns: iroh::docs::NamespaceId) -> anyhow::Result<iroh::client::Doc> {
157 self.docs
158 .get_or_insert_async(&ns, async {
159 self.client
160 .docs()
161 .import_namespace(iroh::docs::Capability::Read(ns))
162 .await
163 })
164 .await
165 }
166
167 pub fn get_entity_key(subspace: SubspaceId, path: &[PathSegment]) -> Vec<u8> {
168 assert_ne!(
169 path.first(),
170 Some(&*LEAF_GC_PREFIX),
171 "Cannot write entities to reserved prefix: {}",
172 LEAF_GC_PREFIX_STR
173 );
174 let mut path = path.to_vec();
175 path.insert(0, PathSegment::Bytes(subspace.to_vec()));
176 IrohDocumentKeyFormat::new(path).to_bytes()
177 }
178}
179
180impl LeafStore for LeafIrohStore {
181 fn key_resolvers(&self) -> Box<dyn Iterator<Item = &dyn super::KeyResolverImpl<Digest>> + '_> {
182 Box::new([].into_iter())
183 }
184
185 fn encryption_algorithms(
186 &self,
187 ) -> Box<dyn Iterator<Item = &dyn super::EncryptionAlgorithmImpl<Digest>> + '_> {
188 Box::new([].into_iter())
189 }
190
191 async fn store_blob(
192 &self,
193 data: &[u8],
194 link: &ExactLink,
195 entity_snapshot_id: Digest,
196 ) -> anyhow::Result<Digest> {
197 let doc = self.open(link.namespace.into()).await?;
198 let hash = self.client.blobs().add_bytes(data.to_vec()).await?.hash;
199
200 let key = LeafGcPath::new(link, entity_snapshot_id, Digest(hash));
201 let doc_key = key.to_bytes();
202 let author_id = self.client.authors().default().await?;
203 doc.set_hash(author_id, doc_key, hash, data.len() as u64)
204 .await?;
205 Ok(Digest(hash))
206 }
207
208 async fn del_blobs(
209 &self,
210 link: &ExactLink,
211 entity_snapshot_id: Digest,
212 ) -> anyhow::Result<usize> {
213 let doc = self.open(link.namespace.into()).await?;
214
215 let path_prefix = LeafGcPathPrefix::new(link, entity_snapshot_id).to_bytes();
216 let author_id = self.client.authors().default().await?;
217 let deleted = doc.del(author_id, path_prefix).await?;
218
219 Ok(deleted)
220 }
221
222 async fn get_blob(&self, digest: Digest) -> anyhow::Result<Vec<u8>> {
223 Ok(self.client.blobs().read_to_bytes(digest.0).await?.to_vec())
224 }
225
226 async fn store_entity(&self, link: &ExactLink, data: Vec<u8>) -> anyhow::Result<Digest> {
227 let doc = self.open(link.namespace.into()).await?;
228 let key = Self::get_entity_key(link.subspace, &link.path.0);
229 let digest = doc.set_bytes(link.subspace.into(), key, data).await?;
230 Ok(Digest(digest))
231 }
232 async fn del_entity(&self, link: &ExactLink) -> anyhow::Result<()> {
233 let doc = self.open(link.namespace.into()).await?;
234 let key = Self::get_entity_key(link.subspace, &link.path.0);
235 doc.del(link.subspace.into(), key).await?;
236 Ok(())
237 }
238
239 async fn get_entity(&self, link: &ExactLink) -> anyhow::Result<Option<Digest>> {
240 let doc = self.open(link.namespace.into()).await?;
241 let key = Self::get_entity_key(link.subspace, &link.path.0);
242 let entity = doc.get_exact(link.subspace.into(), key, false).await?;
243 let entity = entity.map(|entry| Digest(entry.content_hash()));
244 Ok(entity)
245 }
246
247 async fn list(
248 &self,
249 link: ExactLink,
250 limit: Option<u64>,
251 offset: Option<u64>,
252 ) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<ExactLink>>> {
253 let link = link.clone();
254 let doc = self.open(link.namespace.into()).await?;
255
256 let mut path = vec![PathSegment::Bytes(link.subspace.to_vec())];
257 path.extend(link.path.0.iter().cloned());
258 let path_bytes = IrohDocumentKeyFormat::new(path).to_bytes();
259 let path_bytes = &path_bytes[0..(path_bytes.len() - 1)];
261
262 let mut query = Query::key_prefix(path_bytes).author(link.subspace.into());
263 if let Some(limit) = limit {
264 query = query.limit(limit);
265 }
266 if let Some(offset) = offset {
267 query = query.limit(offset);
268 }
269 let stream = doc.get_many(query).await?;
270
271 let s = stream.and_then(move |x| async move {
272 let mut key = IrohDocumentKeyFormat::from_bytes(x.key())?;
273 key.path.remove(0); Ok(ExactLink {
276 namespace: link.namespace,
277 subspace: link.subspace,
278 path: EntityPath(key.path),
279 })
280 });
281
282 Ok(s)
283 }
284
285 async fn create_subspace(&self) -> anyhow::Result<SubspaceId> {
286 let author = self.client.authors().create().await?;
287 Ok(*author.as_bytes())
288 }
289
290 async fn import_subspace_secret(&self, author_secret: [u8; 32]) -> anyhow::Result<SubspaceId> {
291 let author = Author::from_bytes(&author_secret);
292 let id = *author.public_key().as_bytes();
293 self.client.authors().import(author).await?;
294 Ok(id)
295 }
296
297 async fn get_subspace_secret(
298 &self,
299 author: SubspaceId,
300 ) -> anyhow::Result<Option<crate::prelude::SubspaceSecretKey>> {
301 let author = self.client.authors().export(AuthorId::from(author)).await?;
302 Ok(author.map(|x| x.to_bytes()))
303 }
304
305 async fn create_namespace(&self) -> anyhow::Result<crate::prelude::NamespaceId> {
306 let doc = self.client.docs().create().await?;
307 Ok(doc.id().to_bytes())
308 }
309
310 async fn import_namespace_secret(
311 &self,
312 namespace_secret: [u8; 32],
313 ) -> anyhow::Result<crate::prelude::NamespaceId> {
314 let secret = NamespaceSecret::from_bytes(&namespace_secret);
315 let id = *secret.id().as_bytes();
316 self.client
317 .docs()
318 .import_namespace(iroh::docs::Capability::Write(secret))
319 .await?;
320 Ok(id)
321 }
322
323 async fn get_namespace_secret(
324 &self,
325 namespace: crate::prelude::NamespaceId,
326 ) -> anyhow::Result<Option<NamespaceSecretKey>> {
327 let doc = self.open(namespace.into()).await?;
328 let capability = doc
329 .share(iroh::client::docs::ShareMode::Write, AddrInfoOptions::Id)
330 .await
331 .map(|x| x.capability)
332 .ok()
333 .and_then(|x| {
334 if let Capability::Write(secret) = x {
335 Some(secret.to_bytes())
336 } else {
337 None
338 }
339 });
340 Ok(capability)
341 }
342
343 async fn list_subspaces(
344 &self,
345 ) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<SubspaceId>>> {
346 Ok(self
347 .client
348 .authors()
349 .list()
350 .await?
351 .map_ok(move |id| *id.as_bytes()))
352 }
353
354 async fn list_namespaces(
355 &self,
356 ) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<leaf_protocol_types::NamespaceId>>>
357 {
358 Ok(self
359 .client
360 .docs()
361 .list()
362 .await?
363 .map_ok(move |(id, _)| *id.as_bytes()))
364 }
365}