noosphere_cli/native/render/
buffer.rs

1use anyhow::{anyhow, Result};
2use cid::Cid;
3use noosphere_core::context::{AsyncFileBody, SphereFile};
4use noosphere_core::data::Did;
5use std::{
6    collections::{BTreeMap, BTreeSet},
7    future::Future,
8    pin::Pin,
9};
10use tokio::task::JoinSet;
11
12use super::SphereWriter;
13
14/// The form a callback that can be passed when flushing a [ChangeBuffer]; note
15/// that it is generic for the argument that is passed to the callback.
16pub type ChangeBufferFlushCallback<T> =
17    Box<dyn Fn(BTreeMap<String, T>, BTreeSet<String>) -> Pin<Box<dyn Future<Output = Result<()>>>>>;
18
19/// A [ChangeBuffer] enables order-sensitive buffering of changes, and is meant
20/// to be used when traversing incremental revisions of a sphere. If changes are
21/// buffered in history order, they can be flushed and the flusher will be able
22/// to work with a flattened representation of all those changes.
23#[derive(Debug)]
24pub struct ChangeBuffer<T> {
25    capacity: usize,
26    added: BTreeMap<String, T>,
27    removed: BTreeSet<String>,
28}
29
30impl<T> ChangeBuffer<T> {
31    /// Initialize a [ChangeBuffer] with the given capacity. When the capacity
32    /// is reached, the [ChangeBuffer] _must_ be flushed before additional
33    /// changes are buffered.
34    pub fn new(capacity: usize) -> Self {
35        ChangeBuffer {
36            capacity,
37            added: BTreeMap::default(),
38            removed: BTreeSet::default(),
39        }
40    }
41
42    fn assert_not_full(&self) -> Result<()> {
43        if self.is_full() {
44            Err(anyhow!("Change buffer is full"))
45        } else {
46            Ok(())
47        }
48    }
49
50    /// Returns true if the [ChangeBuffer] is full
51    pub fn is_full(&self) -> bool {
52        (self.added.len() + self.removed.len()) >= self.capacity
53    }
54
55    /// Buffer an additive change by key
56    pub fn add(&mut self, key: String, value: T) -> Result<()> {
57        self.assert_not_full()?;
58
59        self.removed.remove(&key);
60        self.added.insert(key, value);
61
62        Ok(())
63    }
64
65    /// Buffer a removal by key
66    pub fn remove(&mut self, key: &str) -> Result<()> {
67        self.assert_not_full()?;
68
69        self.added.remove(key);
70        self.removed.insert(key.to_owned());
71
72        Ok(())
73    }
74
75    /// Take all the buffered, flattened changes. This has the effect of
76    /// resetting the [ChangeBuffer] internally.
77    pub fn take(&mut self) -> (BTreeMap<String, T>, BTreeSet<String>) {
78        (
79            std::mem::take(&mut self.added),
80            std::mem::take(&mut self.removed),
81        )
82    }
83}
84
85impl<R> ChangeBuffer<SphereFile<R>>
86where
87    R: AsyncFileBody + 'static,
88{
89    /// Flush the [ChangeBuffer] to a [SphereWriter] for the case where we are
90    /// dealing in sphere content
91    #[instrument(skip(self))]
92    pub async fn flush_to_writer(&mut self, writer: &SphereWriter) -> Result<()> {
93        let (added, removed) = self.take();
94        let mut changes = JoinSet::<Result<()>>::new();
95
96        for (slug, mut file) in added {
97            let writer = writer.clone();
98            changes.spawn(async move {
99                trace!("Writing '{slug}'...");
100                writer.write_content(&slug, &mut file).await
101            });
102        }
103
104        for slug in removed {
105            let writer = writer.clone();
106            changes.spawn(async move {
107                trace!("Removing '{slug}'...");
108                writer.remove_content(&slug).await
109            });
110        }
111
112        while let Some(result) = changes.join_next().await {
113            match result {
114                Ok(result) => match result {
115                    Ok(_) => (),
116                    Err(error) => {
117                        warn!("Content write failed: {}", error);
118                    }
119                },
120                Err(error) => {
121                    warn!("Content change task failed: {}", error);
122                }
123            };
124        }
125
126        Ok(())
127    }
128}
129
130impl ChangeBuffer<(Did, Cid)> {
131    /// Flush the [ChangeBuffer] to a [SphereWriter] for the case where we are
132    /// dealing with peer references
133    #[instrument]
134    pub async fn flush_to_writer(&mut self, writer: &SphereWriter) -> Result<()> {
135        let (added, removed) = self.take();
136        let mut changes = JoinSet::<Result<()>>::new();
137
138        for (petname, (did, cid)) in added {
139            let writer = writer.clone();
140            changes.spawn(async move {
141                trace!("Writing @{petname}...");
142                writer.symlink_peer(&did, &cid, &petname).await
143            });
144        }
145
146        for petname in removed {
147            let writer = writer.clone();
148            changes.spawn(async move {
149                trace!("Removing @{petname}...");
150                writer.unlink_peer(&petname).await
151            });
152        }
153
154        while let Some(result) = changes.join_next().await {
155            match result {
156                Ok(result) => match result {
157                    Ok(_) => (),
158                    Err(error) => {
159                        warn!("Petname write failed: {}", error);
160                    }
161                },
162                Err(error) => {
163                    warn!("Petname change task failed: {}", error);
164                }
165            };
166        }
167
168        Ok(())
169    }
170}