leaf_protocol/
lib.rs

1//! Rust implementation of the [Leaf Protocol draft][lp].
2//!
3//! [lp]: https://github.com/muni-town/agentic-fediverse/blob/49791e6b3ec1df5e0a8604476417e88eed1f9497/leaf-protocol-draft.md
4
5pub mod components;
6pub mod store;
7pub use leaf_protocol_types as types;
8use leaf_protocol_types::Digest;
9
10use anyhow::Result;
11use borsh::{BorshDeserialize, BorshSerialize};
12
13pub use borsh;
14
15use futures::stream::Stream;
16pub use leaf_protocol_macros::*;
17use store::LeafStore;
18use types::{
19    ComponentData, ComponentEntry, ComponentKind, Entity, ExactLink, NamespaceId,
20    NamespaceSecretKey, SubspaceId, SubspaceSecretKey,
21};
22
23#[cfg(feature = "backend_iroh")]
24pub use iroh;
25
26pub mod prelude {
27    pub use crate::components::*;
28    #[cfg(feature = "backend_iroh")]
29    pub use crate::store::iroh::*;
30    pub use crate::store::{EncryptionAlgorithmImpl, KeyResolverImpl, LeafStore};
31    pub use crate::types::*;
32    pub use crate::*;
33    pub use borsh::{BorshDeserialize, BorshSerialize};
34}
35
36/// Trait implemented by `Component`s in the Leaf data model.
37///
38/// Implementers should usually derive this using the [`Component`][leaf_protocol_macros::Component]
39/// macro.
40pub trait Component: types::HasBorshSchema + BorshDeserialize + BorshSerialize {
41    /// Returns the digest of the schema for this component.
42    fn schema_id() -> Digest;
43    fn make_data(&self) -> std::io::Result<ComponentData> {
44        let mut data = Vec::new();
45        self.serialize(&mut data)?;
46        Ok(ComponentData {
47            schema: Self::schema_id(),
48            data,
49        })
50    }
51}
52
53/// The leaf store, the entrypoint to the leaf API.
54#[derive(Clone, Debug)]
55pub struct Leaf<Store: LeafStore> {
56    /// The backend store.
57    pub store: Store,
58}
59
60#[derive(Debug)]
61pub enum EntityEntry<S: LeafStore> {
62    Entity(LoadedEntity<S>),
63    Empty { link: ExactLink, store: S },
64}
65
66impl<S: LeafStore> EntityEntry<S> {
67    /// Get the entity at this entry.
68    pub fn entity(self) -> Result<LoadedEntity<S>> {
69        match self {
70            EntityEntry::Entity(e) => Ok(e),
71            EntityEntry::Empty { link, .. } => {
72                Err(anyhow::format_err!("Entity does not exist at: {link:?}"))
73            }
74        }
75    }
76
77    /// Get the retrieved entity or initialize an empty one at this link.
78    pub fn get_or_init(self) -> LoadedEntity<S> {
79        match self {
80            EntityEntry::Entity(e) => e,
81            EntityEntry::Empty { link, store } => LoadedEntity {
82                store,
83                link,
84                entity: Entity::default(),
85                digest: Digest::from_bytes([0; 32]),
86                pending_components: Default::default(),
87            },
88        }
89    }
90}
91
92#[derive(Debug)]
93pub struct LoadedEntity<S: LeafStore> {
94    pub store: S,
95    pub link: ExactLink,
96    pub entity: Entity,
97    /// The digest of the entity. This may be a null digest if you have just called
98    /// [`get_or_init()`][EntityEntry::get_or_init], and the digest has not computed yet.
99    ///
100    /// It will also not be updated until [`save()`][LoadedEntity::save] is called when making
101    /// changes to the entity.
102    pub digest: Digest,
103    /// The list of components that have been added to the entity, but haven't been written to
104    /// storage yet.
105    pub pending_components: Vec<ComponentKind>,
106}
107
108impl<S: LeafStore> LoadedEntity<S> {
109    /// Delete all components of the given type.
110    ///
111    /// The changes will not be persisted until [`save()`][Self::save] is called.
112    pub fn del_components<C: Component>(&mut self) {
113        self.del_components_by_schema(C::schema_id());
114    }
115
116    pub fn del_components_by_schema(&mut self, schema: Digest) {
117        self.entity
118            .components
119            .retain(|entry| entry.schema_id != Some(schema));
120        self.pending_components.retain(|kind| {
121            kind.unencrypted()
122                .map(|x| x.schema != schema)
123                .unwrap_or(true)
124        });
125    }
126
127    /// Get the first component of a given type on the entity, or [`None`] if there is no component
128    /// of that type.
129    pub async fn get_component<C: Component>(&self) -> Result<Option<C>> {
130        for comp in &self.pending_components {
131            if let Some(comp) = comp.unencrypted() {
132                if comp.schema == C::schema_id() {
133                    return Ok(Some(C::deserialize(&mut &comp.data[..])?));
134                }
135            }
136        }
137        for entry in &self.entity.components {
138            if entry.schema_id == Some(C::schema_id()) {
139                let data = self.store.get_blob(entry.component_id).await?;
140                return Ok(Some(C::deserialize(&mut &data[..])?));
141            }
142        }
143        Ok(None)
144    }
145
146    /// Get all components of the given type on the entity.
147    pub async fn get_components<C: Component>(&self) -> Result<Vec<C>> {
148        let mut res = Vec::new();
149        for comp in &self.pending_components {
150            if let Some(comp) = comp.unencrypted() {
151                if comp.schema == C::schema_id() {
152                    res.push(C::deserialize(&mut &comp.data[..])?);
153                }
154            }
155        }
156        for entry in &self.entity.components {
157            if entry.schema_id == Some(C::schema_id()) {
158                let data = self.store.get_blob(entry.component_id).await?;
159                res.push(C::deserialize(&mut &data[..])?);
160            }
161        }
162        Ok(res)
163    }
164
165    pub async fn get_components_by_schema(&self, schema: Digest) -> Result<Vec<Vec<u8>>> {
166        let mut res = Vec::new();
167        for comp in &self.pending_components {
168            if let Some(comp) = comp.unencrypted() {
169                if comp.schema == schema {
170                    res.push(comp.data.clone());
171                }
172            }
173        }
174        for entry in &self.entity.components {
175            if entry.schema_id == Some(schema) {
176                let data = self.store.get_blob(entry.component_id).await?;
177                let component_kind = ComponentKind::deserialize(&mut &data[..])?;
178                if let ComponentKind::Unencrypted(data) = component_kind {
179                    if schema == data.schema {
180                        res.push(data.data);
181                    }
182                }
183            }
184        }
185        Ok(res)
186    }
187
188    /// Remove all components of the same type that are already on the entity, and then add the
189    /// component to the entity.
190    ///
191    /// This is similar to [`add_component()`][Self::add_component], but it makes sure that after
192    /// it's done there is only one component of the given type on the entity.
193    ///
194    /// The change will not be persisted until [`save()`][Self::save] is called.
195    pub fn set_component<C: Component>(&mut self, data: C) -> Result<()> {
196        self.del_components::<C>();
197        self.add_component(data)?;
198        Ok(())
199    }
200
201    /// Add a component to the entity.
202    ///
203    /// The component will not be persisted until [`save()`][Self::save] is called.
204    pub fn add_component<C: Component>(&mut self, data: C) -> Result<()> {
205        self.pending_components
206            .push(ComponentKind::Unencrypted(ComponentData {
207                schema: C::schema_id(),
208                data: {
209                    let mut buf = Vec::new();
210                    data.serialize(&mut buf)?;
211                    buf
212                },
213            }));
214        Ok(())
215    }
216
217    pub fn add_component_data(&mut self, data: ComponentKind) {
218        self.pending_components.push(data);
219    }
220
221    /// Persist updates made to this entity's components, writing updated entity and components to
222    /// the store.
223    pub async fn save(&mut self) -> anyhow::Result<()> {
224        // Clean up old blob pins if there was a previous version of this entity.
225        if let Some(old_snapshot_id) = self.store.get_entity(&self.link).await? {
226            self.store.del_blobs(&self.link, old_snapshot_id).await?;
227        }
228
229        struct PendingComponent {
230            schema: Option<Digest>,
231            data_hash: Digest,
232            data: Vec<u8>,
233        }
234        let mut pending_components =
235            Vec::with_capacity(self.pending_components.len() + self.entity.components.len());
236
237        for component in &self.pending_components {
238            let mut buf = Vec::new();
239            component.serialize(&mut buf)?;
240            let digest = Digest::new(&buf);
241            pending_components.push(PendingComponent {
242                schema: component.unencrypted().map(|x| x.schema),
243                data: buf,
244                data_hash: digest,
245            });
246        }
247
248        let mut new_entity_snapshot = Entity::default();
249        new_entity_snapshot
250            .components
251            .extend(self.entity.components.iter().cloned());
252        new_entity_snapshot
253            .components
254            .extend(pending_components.iter().map(|x| ComponentEntry {
255                schema_id: x.schema,
256                component_id: x.data_hash,
257            }));
258        new_entity_snapshot.components.sort();
259        new_entity_snapshot.components.dedup();
260        let mut new_entity_snapshot_buf = Vec::new();
261        new_entity_snapshot.serialize(&mut new_entity_snapshot_buf)?;
262
263        let new_entity_snapshot_id = Digest::new(&new_entity_snapshot_buf);
264
265        for comp in pending_components {
266            // TODO: Make sure that snapshots and link snapshots are added to the blob store pins.
267            //
268            // The component may contain a `Snapshot` or a `Link` with a snapshot that should be
269            // persisted. As it stands, nothing will make sure that the snapshot's blob is
270            // persisted. We need to add another `store_blob` call for each snapshot.
271            //
272            // We can tell when a component contains a snapashot by combing it's `BorshSchema`, but
273            // we must also walk through the component bytes as we recurse through the schema so
274            // that when we find a link we can load it's bytes.
275            let dig = self
276                .store
277                .store_blob(&comp.data, &self.link, new_entity_snapshot_id)
278                .await?;
279            assert_eq!(dig, comp.data_hash);
280        }
281        let verification_digest = self
282            .store
283            .store_entity(&self.link, new_entity_snapshot_buf)
284            .await?;
285        assert_eq!(
286            verification_digest, new_entity_snapshot_id,
287            "Entity snapshot digest incorrect"
288        );
289        self.pending_components.clear();
290        self.entity = new_entity_snapshot;
291        self.digest = new_entity_snapshot_id;
292
293        Ok(())
294    }
295
296    /// Delete the entity. Changes are immediately written to the store.
297    pub async fn delete(&mut self) -> anyhow::Result<()> {
298        if let Some(old_snapshot_id) = self.store.get_entity(&self.link).await? {
299            // Clean up old blob pins
300            self.store.del_blobs(&self.link, old_snapshot_id).await?;
301            // Delete the entity
302            self.store.del_entity(&self.link).await?;
303
304            // Clear the components on this entity handle
305            self.entity.components.clear();
306        }
307        Ok(())
308    }
309}
310
311// TODO: Store schema data in the network somehow, instead of just the schema hashes.
312
313impl<S: store::LeafStore + Clone> Leaf<S> {
314    /// Create a new leaf store around the given backend store.
315    pub fn new(store: S) -> Self {
316        Self { store }
317    }
318
319    pub async fn create_subspace(&self) -> Result<SubspaceId> {
320        self.store.create_subspace().await
321    }
322    pub async fn import_subspace_secret(&self, secret: SubspaceSecretKey) -> Result<SubspaceId> {
323        self.store.import_subspace_secret(secret).await
324    }
325    pub async fn get_subspace_secret(
326        &self,
327        subspace: SubspaceId,
328    ) -> Result<Option<SubspaceSecretKey>> {
329        self.store.get_subspace_secret(subspace).await
330    }
331    pub async fn create_namespace(&self) -> Result<NamespaceId> {
332        self.store.create_namespace().await
333    }
334    pub async fn import_namespace_secret(&self, secret: NamespaceSecretKey) -> Result<NamespaceId> {
335        self.store.import_namespace_secret(secret).await
336    }
337    pub async fn get_namespace_secret(
338        &self,
339        namespace: NamespaceId,
340    ) -> Result<Option<NamespaceSecretKey>> {
341        self.store.get_namespace_secret(namespace).await
342    }
343
344    /// Load an entity entry
345    pub async fn entity<L: Into<ExactLink>>(&self, link: L) -> Result<EntityEntry<S>> {
346        let link = link.into();
347        let Some(digest) = self.store.get_entity(&link).await? else {
348            return Ok(EntityEntry::Empty {
349                link,
350                store: self.store.clone(),
351            });
352        };
353        let bytes = self.store.get_blob(digest).await?;
354        let entity = Entity::deserialize(&mut &bytes[..])?;
355
356        Ok(EntityEntry::Entity(LoadedEntity {
357            store: self.store.clone(),
358            link,
359            entity,
360            digest,
361            pending_components: Default::default(),
362        }))
363    }
364
365    pub async fn del_entity<L: Into<ExactLink>>(&self, link: L) -> Result<()> {
366        let link = link.into();
367        if let Some(digest) = self.store.get_entity(&link).await? {
368            self.store.del_blobs(&link, digest).await?;
369        }
370        self.store.del_entity(&link).await?;
371        Ok(())
372    }
373
374    pub async fn list<L: Into<ExactLink>>(
375        &self,
376        link: L,
377        // TODO: add a recursive option to allow only listing the entities that are direct children
378        // of the link.
379    ) -> Result<impl Stream<Item = Result<ExactLink>> + '_> {
380        let link = link.into();
381        let s = self.store.list(link, None, None).await?;
382        Ok(s)
383    }
384
385    pub async fn list_namespaces(
386        &self,
387    ) -> anyhow::Result<impl Stream<Item = std::result::Result<NamespaceId, anyhow::Error>> + '_>
388    {
389        self.store.list_namespaces().await
390    }
391    pub async fn list_subspaces(
392        &self,
393    ) -> anyhow::Result<impl Stream<Item = std::result::Result<SubspaceId, anyhow::Error>> + '_>
394    {
395        self.store.list_subspaces().await
396    }
397}