noosphere_gateway/worker/
cleanup.rs

1use crate::GatewayManager;
2use anyhow::{anyhow, Result};
3use noosphere_core::{
4    context::{HasMutableSphereContext, HasSphereContext, SphereCursor, COUNTERPART},
5    data::Did,
6};
7use noosphere_storage::{KeyValueStore, Storage};
8use std::time::Duration;
9use strum_macros::Display as EnumDisplay;
10use tokio::{
11    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
12    task::JoinHandle,
13};
14use tokio_stream::StreamExt;
15
16/// Seconds between finishing all compaction tasks, and
17/// starting a new cycle.
18const PERIODIC_CLEANUP_INTERVAL_SECONDS: u64 = 60 * 60;
19
20#[derive(EnumDisplay)]
21pub enum CleanupJob<C> {
22    CompactHistory(C),
23}
24
25pub fn start_cleanup<M, C, S>(
26    gateway_manager: M,
27) -> (UnboundedSender<CleanupJob<C>>, JoinHandle<Result<()>>)
28where
29    M: GatewayManager<C, S> + 'static,
30    C: HasMutableSphereContext<S> + 'static,
31    S: Storage + 'static,
32{
33    let (tx, rx) = unbounded_channel();
34
35    (tx.clone(), {
36        tokio::task::spawn(async move {
37            let _ = tokio::join!(
38                cleanup_task(rx),
39                periodic_compaction_task(tx, gateway_manager),
40            );
41            Ok(())
42        })
43    })
44}
45
46async fn cleanup_task<C, S>(mut receiver: UnboundedReceiver<CleanupJob<C>>) -> Result<()>
47where
48    C: HasMutableSphereContext<S>,
49    S: Storage + 'static,
50{
51    debug!("Cleanup worker started");
52
53    while let Some(job) = receiver.recv().await {
54        if let Err(error) = process_job(job).await {
55            warn!("Error processing cleanup job: {}", error);
56        }
57    }
58
59    Ok(())
60}
61
62async fn periodic_compaction_task<M, C, S>(tx: UnboundedSender<CleanupJob<C>>, gateway_manager: M)
63where
64    M: GatewayManager<C, S>,
65    C: HasMutableSphereContext<S>,
66    S: Storage + 'static,
67{
68    loop {
69        let mut stream = gateway_manager.experimental_worker_only_iter();
70        loop {
71            match stream.try_next().await {
72                Ok(Some(local_sphere)) => {
73                    if let Err(error) = tx.send(CleanupJob::CompactHistory(local_sphere)) {
74                        error!("Periodic compaction failed: {}", error);
75                    }
76                }
77                Ok(None) => break,
78                Err(error) => {
79                    error!("Could not iterate on managed spheres: {}", error);
80                }
81            }
82        }
83        tokio::time::sleep(Duration::from_secs(PERIODIC_CLEANUP_INTERVAL_SECONDS)).await;
84    }
85}
86
87#[instrument(skip(job))]
88async fn process_job<C, S>(job: CleanupJob<C>) -> Result<()>
89where
90    C: HasMutableSphereContext<S>,
91    S: Storage + 'static,
92{
93    debug!("Running {}", job);
94
95    match job {
96        CleanupJob::CompactHistory(context) => {
97            let mut cursor = SphereCursor::latest(context);
98            let author = cursor.sphere_context().await?.author().clone();
99            let sphere_identity = cursor.identity().await?;
100
101            debug!(
102                "Attempting history compaction for local sphere {}",
103                sphere_identity
104            );
105
106            let counterpart: Did = cursor
107                .sphere_context()
108                .await?
109                .db()
110                .require_key(COUNTERPART)
111                .await?;
112
113            // Look at the parent of the oldest gateway sphere version we have
114            // checked so far; if that parent has a content changelog that
115            // contains a change to the counterpart sphere root, that's the new
116            // base, aka the intended parent version of the compact change we
117            // are about to produce.
118            let (compact_until, version_count) = {
119                let mut version_count = 0usize;
120
121                let sphere = cursor.to_sphere().await?;
122                let stream = sphere.into_history_stream(None);
123
124                tokio::pin!(stream);
125
126                let mut compact_until = None;
127
128                while let Some((cid, sphere)) = stream.try_next().await? {
129                    let counterpart_changed = sphere
130                        .get_content()
131                        .await?
132                        .get_changelog()
133                        .await?
134                        .changes
135                        .iter()
136                        .filter(|op| {
137                            let key = match op {
138                                noosphere_core::data::MapOperation::Add { key, .. } => key,
139                                noosphere_core::data::MapOperation::Remove { key } => key,
140                            };
141                            key == &counterpart
142                        })
143                        .count()
144                        > 0;
145
146                    if counterpart_changed {
147                        break;
148                    }
149
150                    compact_until = Some(cid);
151                    version_count += 1;
152                }
153
154                (compact_until, version_count)
155            };
156
157            // Here we perform the actual compaction, so we take a mutable lock
158            // on the sphere context until we are done
159            if let Some(compact_until) = compact_until {
160                debug!("Compacting {version_count} versions (through {compact_until})",);
161
162                let cursor_version = cursor.version().await?;
163                let mut context = cursor.sphere_context_mut().await?;
164                let latest_version = context.version().await?;
165
166                if cursor_version != latest_version {
167                    return Err(anyhow!(
168                        "Could not compact history; history advanced since job began"
169                    ));
170                }
171
172                let sphere = context.sphere().await?;
173                let new_tip = sphere.compact(&compact_until, &author).await?;
174                context
175                    .db_mut()
176                    .set_version(&sphere_identity, &new_tip)
177                    .await?;
178
179                debug!("Finished compacting {version_count} versions; new tip is {new_tip}");
180            }
181
182            Ok(())
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use anyhow::Result;
190    use noosphere_common::helpers::wait;
191    use noosphere_core::{
192        authority::Access,
193        context::{
194            HasMutableSphereContext, HasSphereContext, SphereContentWrite, SphereCursor,
195            COUNTERPART,
196        },
197        data::ContentType,
198        helpers::{make_valid_link_record, simulated_sphere_context},
199        tracing::initialize_tracing,
200        view::Timeline,
201    };
202    use noosphere_storage::KeyValueStore;
203
204    use crate::{
205        worker::{start_cleanup, CleanupJob},
206        SingleTenantGatewayManager,
207    };
208
209    #[tokio::test]
210    async fn it_compacts_excess_name_record_changes_in_a_gateway_sphere() -> Result<()> {
211        initialize_tracing(None);
212
213        let (mut gateway_sphere_context, _) =
214            simulated_sphere_context(Access::ReadWrite, None).await?;
215        let mut gateway_db = gateway_sphere_context
216            .sphere_context()
217            .await?
218            .db_mut()
219            .clone();
220        let (user_sphere_context, _) =
221            simulated_sphere_context(Access::ReadWrite, Some(gateway_db.clone())).await?;
222        let user_sphere_identity = user_sphere_context.identity().await?;
223        let user_sphere_version = user_sphere_context.version().await?;
224
225        gateway_db
226            .set_key(COUNTERPART, &user_sphere_identity)
227            .await?;
228        gateway_sphere_context
229            .link_raw(&format!("{user_sphere_identity}"), &user_sphere_version)
230            .await?;
231        let base_version = gateway_sphere_context.save(None).await?;
232
233        debug!("Base version: {}", base_version);
234
235        let tl = Timeline::new(&gateway_db);
236        let ts = tl.slice(&base_version, None);
237        let versions = ts.to_chronological().await?;
238
239        debug!(
240            "Before task: {:#?}",
241            versions
242                .iter()
243                .map(|cid| cid.to_string())
244                .collect::<Vec<String>>()
245        );
246
247        let manager = SingleTenantGatewayManager::new(
248            gateway_sphere_context.clone(),
249            user_sphere_identity.clone(),
250        )
251        .await?;
252        let (tx, cleanup_worker) = start_cleanup(manager);
253
254        wait(1).await;
255
256        let mut latest_version = base_version;
257
258        for _ in 0..10 {
259            let (_, link_record, _) = make_valid_link_record(&mut gateway_db.clone()).await?;
260            gateway_sphere_context
261                .write(
262                    &format!("link_record/{user_sphere_identity}"),
263                    &ContentType::Text,
264                    link_record.encode()?.as_bytes(),
265                    None,
266                )
267                .await?;
268            latest_version = gateway_sphere_context.save(None).await?;
269        }
270
271        let ts = tl.slice(&latest_version, None);
272        let versions = ts.to_chronological().await?;
273
274        debug!(
275            "Before compaction: {:#?}",
276            versions
277                .iter()
278                .map(|cid| cid.to_string())
279                .collect::<Vec<String>>()
280        );
281
282        assert_eq!(13, versions.len());
283
284        tx.send(CleanupJob::CompactHistory(gateway_sphere_context.clone()))?;
285
286        wait(1).await;
287
288        debug!("Test proceeding");
289
290        let cursor = SphereCursor::latest(gateway_sphere_context);
291        let new_latest_version = cursor.version().await?;
292
293        debug!("New latest version: {}", new_latest_version);
294
295        assert_ne!(new_latest_version, latest_version);
296
297        let ts = tl.slice(&new_latest_version, None);
298        let versions = ts.to_chronological().await?;
299
300        debug!(
301            "After compaction: {:#?}",
302            versions
303                .iter()
304                .map(|cid| cid.to_string())
305                .collect::<Vec<String>>()
306        );
307
308        assert_eq!(4, versions.len());
309
310        assert_eq!(
311            cursor.to_sphere().await?.get_parent().await?.unwrap().cid(),
312            &base_version
313        );
314
315        cleanup_worker.abort();
316
317        Ok(())
318    }
319}