1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
use anyhow::{anyhow, Result};
use cid::Cid;
use noosphere_core::context::{AsyncFileBody, SphereFile};
use noosphere_core::data::Did;
use std::{
    collections::{BTreeMap, BTreeSet},
    future::Future,
    pin::Pin,
};
use tokio::task::JoinSet;

use super::SphereWriter;

/// The form a callback that can be passed when flushing a [ChangeBuffer]; note
/// that it is generic for the argument that is passed to the callback.
pub type ChangeBufferFlushCallback<T> =
    Box<dyn Fn(BTreeMap<String, T>, BTreeSet<String>) -> Pin<Box<dyn Future<Output = Result<()>>>>>;

/// A [ChangeBuffer] enables order-sensitive buffering of changes, and is meant
/// to be used when traversing incremental revisions of a sphere. If changes are
/// buffered in history order, they can be flushed and the flusher will be able
/// to work with a flattened representation of all those changes.
#[derive(Debug)]
pub struct ChangeBuffer<T> {
    capacity: usize,
    added: BTreeMap<String, T>,
    removed: BTreeSet<String>,
}

impl<T> ChangeBuffer<T> {
    /// Initialize a [ChangeBuffer] with the given capacity. When the capacity
    /// is reached, the [ChangeBuffer] _must_ be flushed before additional
    /// changes are buffered.
    pub fn new(capacity: usize) -> Self {
        ChangeBuffer {
            capacity,
            added: BTreeMap::default(),
            removed: BTreeSet::default(),
        }
    }

    fn assert_not_full(&self) -> Result<()> {
        if self.is_full() {
            Err(anyhow!("Change buffer is full"))
        } else {
            Ok(())
        }
    }

    /// Returns true if the [ChangeBuffer] is full
    pub fn is_full(&self) -> bool {
        (self.added.len() + self.removed.len()) >= self.capacity
    }

    /// Buffer an additive change by key
    pub fn add(&mut self, key: String, value: T) -> Result<()> {
        self.assert_not_full()?;

        self.removed.remove(&key);
        self.added.insert(key, value);

        Ok(())
    }

    /// Buffer a removal by key
    pub fn remove(&mut self, key: &str) -> Result<()> {
        self.assert_not_full()?;

        self.added.remove(key);
        self.removed.insert(key.to_owned());

        Ok(())
    }

    /// Take all the buffered, flattened changes. This has the effect of
    /// resetting the [ChangeBuffer] internally.
    pub fn take(&mut self) -> (BTreeMap<String, T>, BTreeSet<String>) {
        (
            std::mem::take(&mut self.added),
            std::mem::take(&mut self.removed),
        )
    }
}

impl<R> ChangeBuffer<SphereFile<R>>
where
    R: AsyncFileBody + 'static,
{
    /// Flush the [ChangeBuffer] to a [SphereWriter] for the case where we are
    /// dealing in sphere content
    #[instrument(skip(self))]
    pub async fn flush_to_writer(&mut self, writer: &SphereWriter) -> Result<()> {
        let (added, removed) = self.take();
        let mut changes = JoinSet::<Result<()>>::new();

        for (slug, mut file) in added {
            let writer = writer.clone();
            changes.spawn(async move {
                trace!("Writing '{slug}'...");
                writer.write_content(&slug, &mut file).await
            });
        }

        for slug in removed {
            let writer = writer.clone();
            changes.spawn(async move {
                trace!("Removing '{slug}'...");
                writer.remove_content(&slug).await
            });
        }

        while let Some(result) = changes.join_next().await {
            match result {
                Ok(result) => match result {
                    Ok(_) => (),
                    Err(error) => {
                        warn!("Content write failed: {}", error);
                    }
                },
                Err(error) => {
                    warn!("Content change task failed: {}", error);
                }
            };
        }

        Ok(())
    }
}

impl ChangeBuffer<(Did, Cid)> {
    /// Flush the [ChangeBuffer] to a [SphereWriter] for the case where we are
    /// dealing with peer references
    #[instrument]
    pub async fn flush_to_writer(&mut self, writer: &SphereWriter) -> Result<()> {
        let (added, removed) = self.take();
        let mut changes = JoinSet::<Result<()>>::new();

        for (petname, (did, cid)) in added {
            let writer = writer.clone();
            changes.spawn(async move {
                trace!("Writing @{petname}...");
                writer.symlink_peer(&did, &cid, &petname).await
            });
        }

        for petname in removed {
            let writer = writer.clone();
            changes.spawn(async move {
                trace!("Removing @{petname}...");
                writer.unlink_peer(&petname).await
            });
        }

        while let Some(result) = changes.join_next().await {
            match result {
                Ok(result) => match result {
                    Ok(_) => (),
                    Err(error) => {
                        warn!("Petname write failed: {}", error);
                    }
                },
                Err(error) => {
                    warn!("Petname change task failed: {}", error);
                }
            };
        }

        Ok(())
    }
}