leaf_protocol/store/
iroh.rs

1use std::{
2    io::{Cursor, Read, Write},
3    sync::Arc,
4};
5
6use borsh::{BorshDeserialize, BorshSerialize};
7use futures::TryStreamExt;
8use iroh::{
9    base::node_addr::AddrInfoOptions,
10    docs::{store::Query, Author, AuthorId, Capability, NamespaceSecret},
11};
12use once_cell::sync::Lazy;
13
14use crate::{
15    store::LeafStore,
16    types::{EntityPath, NamespaceSecretKey, PathSegment, SubspaceId},
17    Digest, ExactLink,
18};
19
20pub type LeafIroh = crate::Leaf<LeafIrohStore>;
21
22pub const LEAF_GC_PREFIX_STR: &str = "_leaf_gc_";
23pub static LEAF_GC_PREFIX: Lazy<PathSegment> =
24    Lazy::new(|| PathSegment::String(LEAF_GC_PREFIX_STR.into()));
25
26#[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
27pub struct LeafGcPathPrefix {
28    pub leaf_gc_prefix_str: String,
29    pub subspace: [u8; 32],
30    pub entity_path: Vec<PathSegment>,
31    pub entity_snapshot_id: Digest,
32}
33
34impl LeafGcPathPrefix {
35    pub fn new(link: &ExactLink, entity_snapshot_id: Digest) -> Self {
36        LeafGcPathPrefix {
37            leaf_gc_prefix_str: LEAF_GC_PREFIX_STR.into(),
38            subspace: link.subspace,
39            entity_path: link.path.0.clone(),
40            entity_snapshot_id,
41        }
42    }
43    pub fn to_bytes(&self) -> Vec<u8> {
44        let mut buf = Vec::new();
45        self.serialize(&mut buf).unwrap();
46        buf
47    }
48}
49
50#[derive(borsh::BorshSerialize, borsh::BorshDeserialize)]
51pub struct LeafGcPath {
52    pub prefix: LeafGcPathPrefix,
53    pub entity_snapshot_id_plus_blob_hash: Digest,
54}
55
56impl std::fmt::Debug for LeafGcPath {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        write!(f, "LeafGCPath(")?;
59        f.debug_list()
60            .entry(&self.prefix.leaf_gc_prefix_str)
61            .entry(&iroh::base::base32::fmt(self.prefix.subspace))
62            .entries(&self.prefix.entity_path)
63            .entry(&self.prefix.entity_snapshot_id)
64            .entry(&self.entity_snapshot_id_plus_blob_hash)
65            .finish()?;
66        write!(f, ")")
67    }
68}
69
70impl LeafGcPath {
71    pub fn new(link: &ExactLink, entity_snapshot_id: Digest, blob_hash: Digest) -> Self {
72        let entity_snapshot_id_plus_blob_hash =
73            Digest::new(&[*entity_snapshot_id.as_bytes(), *blob_hash.as_bytes()].concat());
74        Self {
75            prefix: LeafGcPathPrefix::new(link, entity_snapshot_id),
76            entity_snapshot_id_plus_blob_hash,
77        }
78    }
79    pub fn to_bytes(&self) -> Vec<u8> {
80        let mut buf = Vec::new();
81        self.serialize(&mut buf).unwrap();
82        buf
83    }
84    pub fn from_bytes(mut bytes: &[u8]) -> std::io::Result<Self> {
85        Self::deserialize(&mut bytes)
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct LeafIrohStore {
91    pub client: iroh::client::Iroh,
92    pub docs: Arc<quick_cache::sync::Cache<iroh::docs::NamespaceId, iroh::client::Doc>>,
93}
94pub struct IrohDocumentKeyFormat {
95    pub path: Vec<PathSegment>,
96}
97impl IrohDocumentKeyFormat {
98    pub fn new(path: Vec<PathSegment>) -> Self {
99        Self { path }
100    }
101
102    pub fn to_bytes(&self) -> Vec<u8> {
103        let mut buf = Vec::new();
104        let mut segment_bytes = Vec::new();
105        for segment in &self.path {
106            segment.serialize(&mut segment_bytes).unwrap();
107            let len: u32 = segment_bytes.len().try_into().unwrap();
108            buf.write_all(&len.to_le_bytes()[..]).unwrap();
109            buf.write_all(&segment_bytes).unwrap();
110            segment_bytes.clear();
111        }
112        buf.push(0); // Add the null trailing byte to make sure it doesn't trigger a prefix deletion.
113        buf
114    }
115
116    pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
117        let last_byte_idx = bytes.len() - 1;
118        if bytes[last_byte_idx] != 0 {
119            anyhow::bail!("Expected null terminating byte.")
120        }
121        let bytes = &bytes[0..last_byte_idx];
122        let len = bytes.len();
123        if len == 0 {
124            return Ok(Self { path: Vec::new() });
125        }
126        let mut reader = Cursor::new(bytes);
127        let mut path = Vec::new();
128        let mut segment_bytes = Vec::new();
129        loop {
130            if reader.position() as usize == len {
131                break;
132            }
133            let mut segment_len_bytes = [0u8; 4];
134            reader.read_exact(&mut segment_len_bytes)?;
135            let segment_len = u32::from_le_bytes(segment_len_bytes);
136
137            segment_bytes.extend(std::iter::repeat(0u8).take(segment_len as _));
138            reader.read_exact(&mut segment_bytes)?;
139
140            path.push(PathSegment::deserialize(&mut &segment_bytes[..])?);
141            segment_bytes.clear();
142        }
143        Ok(Self { path })
144    }
145}
146
147impl LeafIrohStore {
148    pub fn new(client: iroh::client::Iroh) -> Self {
149        Self {
150            client,
151            docs: Arc::new(quick_cache::sync::Cache::new(10)),
152        }
153    }
154
155    /// Open a document using the local document cache.
156    pub async fn open(&self, ns: iroh::docs::NamespaceId) -> anyhow::Result<iroh::client::Doc> {
157        self.docs
158            .get_or_insert_async(&ns, async {
159                self.client
160                    .docs()
161                    .import_namespace(iroh::docs::Capability::Read(ns))
162                    .await
163            })
164            .await
165    }
166
167    pub fn get_entity_key(subspace: SubspaceId, path: &[PathSegment]) -> Vec<u8> {
168        assert_ne!(
169            path.first(),
170            Some(&*LEAF_GC_PREFIX),
171            "Cannot write entities to reserved prefix: {}",
172            LEAF_GC_PREFIX_STR
173        );
174        let mut path = path.to_vec();
175        path.insert(0, PathSegment::Bytes(subspace.to_vec()));
176        IrohDocumentKeyFormat::new(path).to_bytes()
177    }
178}
179
180impl LeafStore for LeafIrohStore {
181    fn key_resolvers(&self) -> Box<dyn Iterator<Item = &dyn super::KeyResolverImpl<Digest>> + '_> {
182        Box::new([].into_iter())
183    }
184
185    fn encryption_algorithms(
186        &self,
187    ) -> Box<dyn Iterator<Item = &dyn super::EncryptionAlgorithmImpl<Digest>> + '_> {
188        Box::new([].into_iter())
189    }
190
191    async fn store_blob(
192        &self,
193        data: &[u8],
194        link: &ExactLink,
195        entity_snapshot_id: Digest,
196    ) -> anyhow::Result<Digest> {
197        let doc = self.open(link.namespace.into()).await?;
198        let hash = self.client.blobs().add_bytes(data.to_vec()).await?.hash;
199
200        let key = LeafGcPath::new(link, entity_snapshot_id, Digest(hash));
201        let doc_key = key.to_bytes();
202        let author_id = self.client.authors().default().await?;
203        doc.set_hash(author_id, doc_key, hash, data.len() as u64)
204            .await?;
205        Ok(Digest(hash))
206    }
207
208    async fn del_blobs(
209        &self,
210        link: &ExactLink,
211        entity_snapshot_id: Digest,
212    ) -> anyhow::Result<usize> {
213        let doc = self.open(link.namespace.into()).await?;
214
215        let path_prefix = LeafGcPathPrefix::new(link, entity_snapshot_id).to_bytes();
216        let author_id = self.client.authors().default().await?;
217        let deleted = doc.del(author_id, path_prefix).await?;
218
219        Ok(deleted)
220    }
221
222    async fn get_blob(&self, digest: Digest) -> anyhow::Result<Vec<u8>> {
223        Ok(self.client.blobs().read_to_bytes(digest.0).await?.to_vec())
224    }
225
226    async fn store_entity(&self, link: &ExactLink, data: Vec<u8>) -> anyhow::Result<Digest> {
227        let doc = self.open(link.namespace.into()).await?;
228        let key = Self::get_entity_key(link.subspace, &link.path.0);
229        let digest = doc.set_bytes(link.subspace.into(), key, data).await?;
230        Ok(Digest(digest))
231    }
232    async fn del_entity(&self, link: &ExactLink) -> anyhow::Result<()> {
233        let doc = self.open(link.namespace.into()).await?;
234        let key = Self::get_entity_key(link.subspace, &link.path.0);
235        doc.del(link.subspace.into(), key).await?;
236        Ok(())
237    }
238
239    async fn get_entity(&self, link: &ExactLink) -> anyhow::Result<Option<Digest>> {
240        let doc = self.open(link.namespace.into()).await?;
241        let key = Self::get_entity_key(link.subspace, &link.path.0);
242        let entity = doc.get_exact(link.subspace.into(), key, false).await?;
243        let entity = entity.map(|entry| Digest(entry.content_hash()));
244        Ok(entity)
245    }
246
247    async fn list(
248        &self,
249        link: ExactLink,
250        limit: Option<u64>,
251        offset: Option<u64>,
252    ) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<ExactLink>>> {
253        let link = link.clone();
254        let doc = self.open(link.namespace.into()).await?;
255
256        let mut path = vec![PathSegment::Bytes(link.subspace.to_vec())];
257        path.extend(link.path.0.iter().cloned());
258        let path_bytes = IrohDocumentKeyFormat::new(path).to_bytes();
259        // Remove the null terminator so that we find all of the children of this path
260        let path_bytes = &path_bytes[0..(path_bytes.len() - 1)];
261
262        let mut query = Query::key_prefix(path_bytes).author(link.subspace.into());
263        if let Some(limit) = limit {
264            query = query.limit(limit);
265        }
266        if let Some(offset) = offset {
267            query = query.limit(offset);
268        }
269        let stream = doc.get_many(query).await?;
270
271        let s = stream.and_then(move |x| async move {
272            let mut key = IrohDocumentKeyFormat::from_bytes(x.key())?;
273            key.path.remove(0); // Remove the subspace path segment
274
275            Ok(ExactLink {
276                namespace: link.namespace,
277                subspace: link.subspace,
278                path: EntityPath(key.path),
279            })
280        });
281
282        Ok(s)
283    }
284
285    async fn create_subspace(&self) -> anyhow::Result<SubspaceId> {
286        let author = self.client.authors().create().await?;
287        Ok(*author.as_bytes())
288    }
289
290    async fn import_subspace_secret(&self, author_secret: [u8; 32]) -> anyhow::Result<SubspaceId> {
291        let author = Author::from_bytes(&author_secret);
292        let id = *author.public_key().as_bytes();
293        self.client.authors().import(author).await?;
294        Ok(id)
295    }
296
297    async fn get_subspace_secret(
298        &self,
299        author: SubspaceId,
300    ) -> anyhow::Result<Option<crate::prelude::SubspaceSecretKey>> {
301        let author = self.client.authors().export(AuthorId::from(author)).await?;
302        Ok(author.map(|x| x.to_bytes()))
303    }
304
305    async fn create_namespace(&self) -> anyhow::Result<crate::prelude::NamespaceId> {
306        let doc = self.client.docs().create().await?;
307        Ok(doc.id().to_bytes())
308    }
309
310    async fn import_namespace_secret(
311        &self,
312        namespace_secret: [u8; 32],
313    ) -> anyhow::Result<crate::prelude::NamespaceId> {
314        let secret = NamespaceSecret::from_bytes(&namespace_secret);
315        let id = *secret.id().as_bytes();
316        self.client
317            .docs()
318            .import_namespace(iroh::docs::Capability::Write(secret))
319            .await?;
320        Ok(id)
321    }
322
323    async fn get_namespace_secret(
324        &self,
325        namespace: crate::prelude::NamespaceId,
326    ) -> anyhow::Result<Option<NamespaceSecretKey>> {
327        let doc = self.open(namespace.into()).await?;
328        let capability = doc
329            .share(iroh::client::docs::ShareMode::Write, AddrInfoOptions::Id)
330            .await
331            .map(|x| x.capability)
332            .ok()
333            .and_then(|x| {
334                if let Capability::Write(secret) = x {
335                    Some(secret.to_bytes())
336                } else {
337                    None
338                }
339            });
340        Ok(capability)
341    }
342
343    async fn list_subspaces(
344        &self,
345    ) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<SubspaceId>>> {
346        Ok(self
347            .client
348            .authors()
349            .list()
350            .await?
351            .map_ok(move |id| *id.as_bytes()))
352    }
353
354    async fn list_namespaces(
355        &self,
356    ) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<leaf_protocol_types::NamespaceId>>>
357    {
358        Ok(self
359            .client
360            .docs()
361            .list()
362            .await?
363            .map_ok(move |(id, _)| *id.as_bytes()))
364    }
365}