noosphere_gateway/worker/
cleanup.rs1use crate::GatewayManager;
2use anyhow::{anyhow, Result};
3use noosphere_core::{
4 context::{HasMutableSphereContext, HasSphereContext, SphereCursor, COUNTERPART},
5 data::Did,
6};
7use noosphere_storage::{KeyValueStore, Storage};
8use std::time::Duration;
9use strum_macros::Display as EnumDisplay;
10use tokio::{
11 sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
12 task::JoinHandle,
13};
14use tokio_stream::StreamExt;
15
16const PERIODIC_CLEANUP_INTERVAL_SECONDS: u64 = 60 * 60;
19
20#[derive(EnumDisplay)]
21pub enum CleanupJob<C> {
22 CompactHistory(C),
23}
24
25pub fn start_cleanup<M, C, S>(
26 gateway_manager: M,
27) -> (UnboundedSender<CleanupJob<C>>, JoinHandle<Result<()>>)
28where
29 M: GatewayManager<C, S> + 'static,
30 C: HasMutableSphereContext<S> + 'static,
31 S: Storage + 'static,
32{
33 let (tx, rx) = unbounded_channel();
34
35 (tx.clone(), {
36 tokio::task::spawn(async move {
37 let _ = tokio::join!(
38 cleanup_task(rx),
39 periodic_compaction_task(tx, gateway_manager),
40 );
41 Ok(())
42 })
43 })
44}
45
46async fn cleanup_task<C, S>(mut receiver: UnboundedReceiver<CleanupJob<C>>) -> Result<()>
47where
48 C: HasMutableSphereContext<S>,
49 S: Storage + 'static,
50{
51 debug!("Cleanup worker started");
52
53 while let Some(job) = receiver.recv().await {
54 if let Err(error) = process_job(job).await {
55 warn!("Error processing cleanup job: {}", error);
56 }
57 }
58
59 Ok(())
60}
61
62async fn periodic_compaction_task<M, C, S>(tx: UnboundedSender<CleanupJob<C>>, gateway_manager: M)
63where
64 M: GatewayManager<C, S>,
65 C: HasMutableSphereContext<S>,
66 S: Storage + 'static,
67{
68 loop {
69 let mut stream = gateway_manager.experimental_worker_only_iter();
70 loop {
71 match stream.try_next().await {
72 Ok(Some(local_sphere)) => {
73 if let Err(error) = tx.send(CleanupJob::CompactHistory(local_sphere)) {
74 error!("Periodic compaction failed: {}", error);
75 }
76 }
77 Ok(None) => break,
78 Err(error) => {
79 error!("Could not iterate on managed spheres: {}", error);
80 }
81 }
82 }
83 tokio::time::sleep(Duration::from_secs(PERIODIC_CLEANUP_INTERVAL_SECONDS)).await;
84 }
85}
86
87#[instrument(skip(job))]
88async fn process_job<C, S>(job: CleanupJob<C>) -> Result<()>
89where
90 C: HasMutableSphereContext<S>,
91 S: Storage + 'static,
92{
93 debug!("Running {}", job);
94
95 match job {
96 CleanupJob::CompactHistory(context) => {
97 let mut cursor = SphereCursor::latest(context);
98 let author = cursor.sphere_context().await?.author().clone();
99 let sphere_identity = cursor.identity().await?;
100
101 debug!(
102 "Attempting history compaction for local sphere {}",
103 sphere_identity
104 );
105
106 let counterpart: Did = cursor
107 .sphere_context()
108 .await?
109 .db()
110 .require_key(COUNTERPART)
111 .await?;
112
113 let (compact_until, version_count) = {
119 let mut version_count = 0usize;
120
121 let sphere = cursor.to_sphere().await?;
122 let stream = sphere.into_history_stream(None);
123
124 tokio::pin!(stream);
125
126 let mut compact_until = None;
127
128 while let Some((cid, sphere)) = stream.try_next().await? {
129 let counterpart_changed = sphere
130 .get_content()
131 .await?
132 .get_changelog()
133 .await?
134 .changes
135 .iter()
136 .filter(|op| {
137 let key = match op {
138 noosphere_core::data::MapOperation::Add { key, .. } => key,
139 noosphere_core::data::MapOperation::Remove { key } => key,
140 };
141 key == &counterpart
142 })
143 .count()
144 > 0;
145
146 if counterpart_changed {
147 break;
148 }
149
150 compact_until = Some(cid);
151 version_count += 1;
152 }
153
154 (compact_until, version_count)
155 };
156
157 if let Some(compact_until) = compact_until {
160 debug!("Compacting {version_count} versions (through {compact_until})",);
161
162 let cursor_version = cursor.version().await?;
163 let mut context = cursor.sphere_context_mut().await?;
164 let latest_version = context.version().await?;
165
166 if cursor_version != latest_version {
167 return Err(anyhow!(
168 "Could not compact history; history advanced since job began"
169 ));
170 }
171
172 let sphere = context.sphere().await?;
173 let new_tip = sphere.compact(&compact_until, &author).await?;
174 context
175 .db_mut()
176 .set_version(&sphere_identity, &new_tip)
177 .await?;
178
179 debug!("Finished compacting {version_count} versions; new tip is {new_tip}");
180 }
181
182 Ok(())
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use anyhow::Result;
190 use noosphere_common::helpers::wait;
191 use noosphere_core::{
192 authority::Access,
193 context::{
194 HasMutableSphereContext, HasSphereContext, SphereContentWrite, SphereCursor,
195 COUNTERPART,
196 },
197 data::ContentType,
198 helpers::{make_valid_link_record, simulated_sphere_context},
199 tracing::initialize_tracing,
200 view::Timeline,
201 };
202 use noosphere_storage::KeyValueStore;
203
204 use crate::{
205 worker::{start_cleanup, CleanupJob},
206 SingleTenantGatewayManager,
207 };
208
209 #[tokio::test]
210 async fn it_compacts_excess_name_record_changes_in_a_gateway_sphere() -> Result<()> {
211 initialize_tracing(None);
212
213 let (mut gateway_sphere_context, _) =
214 simulated_sphere_context(Access::ReadWrite, None).await?;
215 let mut gateway_db = gateway_sphere_context
216 .sphere_context()
217 .await?
218 .db_mut()
219 .clone();
220 let (user_sphere_context, _) =
221 simulated_sphere_context(Access::ReadWrite, Some(gateway_db.clone())).await?;
222 let user_sphere_identity = user_sphere_context.identity().await?;
223 let user_sphere_version = user_sphere_context.version().await?;
224
225 gateway_db
226 .set_key(COUNTERPART, &user_sphere_identity)
227 .await?;
228 gateway_sphere_context
229 .link_raw(&format!("{user_sphere_identity}"), &user_sphere_version)
230 .await?;
231 let base_version = gateway_sphere_context.save(None).await?;
232
233 debug!("Base version: {}", base_version);
234
235 let tl = Timeline::new(&gateway_db);
236 let ts = tl.slice(&base_version, None);
237 let versions = ts.to_chronological().await?;
238
239 debug!(
240 "Before task: {:#?}",
241 versions
242 .iter()
243 .map(|cid| cid.to_string())
244 .collect::<Vec<String>>()
245 );
246
247 let manager = SingleTenantGatewayManager::new(
248 gateway_sphere_context.clone(),
249 user_sphere_identity.clone(),
250 )
251 .await?;
252 let (tx, cleanup_worker) = start_cleanup(manager);
253
254 wait(1).await;
255
256 let mut latest_version = base_version;
257
258 for _ in 0..10 {
259 let (_, link_record, _) = make_valid_link_record(&mut gateway_db.clone()).await?;
260 gateway_sphere_context
261 .write(
262 &format!("link_record/{user_sphere_identity}"),
263 &ContentType::Text,
264 link_record.encode()?.as_bytes(),
265 None,
266 )
267 .await?;
268 latest_version = gateway_sphere_context.save(None).await?;
269 }
270
271 let ts = tl.slice(&latest_version, None);
272 let versions = ts.to_chronological().await?;
273
274 debug!(
275 "Before compaction: {:#?}",
276 versions
277 .iter()
278 .map(|cid| cid.to_string())
279 .collect::<Vec<String>>()
280 );
281
282 assert_eq!(13, versions.len());
283
284 tx.send(CleanupJob::CompactHistory(gateway_sphere_context.clone()))?;
285
286 wait(1).await;
287
288 debug!("Test proceeding");
289
290 let cursor = SphereCursor::latest(gateway_sphere_context);
291 let new_latest_version = cursor.version().await?;
292
293 debug!("New latest version: {}", new_latest_version);
294
295 assert_ne!(new_latest_version, latest_version);
296
297 let ts = tl.slice(&new_latest_version, None);
298 let versions = ts.to_chronological().await?;
299
300 debug!(
301 "After compaction: {:#?}",
302 versions
303 .iter()
304 .map(|cid| cid.to_string())
305 .collect::<Vec<String>>()
306 );
307
308 assert_eq!(4, versions.len());
309
310 assert_eq!(
311 cursor.to_sphere().await?.get_parent().await?.unwrap().cid(),
312 &base_version
313 );
314
315 cleanup_worker.abort();
316
317 Ok(())
318 }
319}