noosphere_cli/native/render/
buffer.rs1use anyhow::{anyhow, Result};
2use cid::Cid;
3use noosphere_core::context::{AsyncFileBody, SphereFile};
4use noosphere_core::data::Did;
5use std::{
6 collections::{BTreeMap, BTreeSet},
7 future::Future,
8 pin::Pin,
9};
10use tokio::task::JoinSet;
11
12use super::SphereWriter;
13
14pub type ChangeBufferFlushCallback<T> =
17 Box<dyn Fn(BTreeMap<String, T>, BTreeSet<String>) -> Pin<Box<dyn Future<Output = Result<()>>>>>;
18
19#[derive(Debug)]
24pub struct ChangeBuffer<T> {
25 capacity: usize,
26 added: BTreeMap<String, T>,
27 removed: BTreeSet<String>,
28}
29
30impl<T> ChangeBuffer<T> {
31 pub fn new(capacity: usize) -> Self {
35 ChangeBuffer {
36 capacity,
37 added: BTreeMap::default(),
38 removed: BTreeSet::default(),
39 }
40 }
41
42 fn assert_not_full(&self) -> Result<()> {
43 if self.is_full() {
44 Err(anyhow!("Change buffer is full"))
45 } else {
46 Ok(())
47 }
48 }
49
50 pub fn is_full(&self) -> bool {
52 (self.added.len() + self.removed.len()) >= self.capacity
53 }
54
55 pub fn add(&mut self, key: String, value: T) -> Result<()> {
57 self.assert_not_full()?;
58
59 self.removed.remove(&key);
60 self.added.insert(key, value);
61
62 Ok(())
63 }
64
65 pub fn remove(&mut self, key: &str) -> Result<()> {
67 self.assert_not_full()?;
68
69 self.added.remove(key);
70 self.removed.insert(key.to_owned());
71
72 Ok(())
73 }
74
75 pub fn take(&mut self) -> (BTreeMap<String, T>, BTreeSet<String>) {
78 (
79 std::mem::take(&mut self.added),
80 std::mem::take(&mut self.removed),
81 )
82 }
83}
84
85impl<R> ChangeBuffer<SphereFile<R>>
86where
87 R: AsyncFileBody + 'static,
88{
89 #[instrument(skip(self))]
92 pub async fn flush_to_writer(&mut self, writer: &SphereWriter) -> Result<()> {
93 let (added, removed) = self.take();
94 let mut changes = JoinSet::<Result<()>>::new();
95
96 for (slug, mut file) in added {
97 let writer = writer.clone();
98 changes.spawn(async move {
99 trace!("Writing '{slug}'...");
100 writer.write_content(&slug, &mut file).await
101 });
102 }
103
104 for slug in removed {
105 let writer = writer.clone();
106 changes.spawn(async move {
107 trace!("Removing '{slug}'...");
108 writer.remove_content(&slug).await
109 });
110 }
111
112 while let Some(result) = changes.join_next().await {
113 match result {
114 Ok(result) => match result {
115 Ok(_) => (),
116 Err(error) => {
117 warn!("Content write failed: {}", error);
118 }
119 },
120 Err(error) => {
121 warn!("Content change task failed: {}", error);
122 }
123 };
124 }
125
126 Ok(())
127 }
128}
129
130impl ChangeBuffer<(Did, Cid)> {
131 #[instrument]
134 pub async fn flush_to_writer(&mut self, writer: &SphereWriter) -> Result<()> {
135 let (added, removed) = self.take();
136 let mut changes = JoinSet::<Result<()>>::new();
137
138 for (petname, (did, cid)) in added {
139 let writer = writer.clone();
140 changes.spawn(async move {
141 trace!("Writing @{petname}...");
142 writer.symlink_peer(&did, &cid, &petname).await
143 });
144 }
145
146 for petname in removed {
147 let writer = writer.clone();
148 changes.spawn(async move {
149 trace!("Removing @{petname}...");
150 writer.unlink_peer(&petname).await
151 });
152 }
153
154 while let Some(result) = changes.join_next().await {
155 match result {
156 Ok(result) => match result {
157 Ok(_) => (),
158 Err(error) => {
159 warn!("Petname write failed: {}", error);
160 }
161 },
162 Err(error) => {
163 warn!("Petname change task failed: {}", error);
164 }
165 };
166 }
167
168 Ok(())
169 }
170}