ella_engine/registry/
transaction_log.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::{config::EllaConfig, Path};
use std::sync::Arc;

use super::{snapshot::Snapshot, transactions::Transaction};

#[derive(Debug)]
pub struct TransactionLog {
    path: Path,
    store: Arc<dyn ObjectStore>,
}

impl TransactionLog {
    const EXT: &'static str = "txt";
    const SNAPSHOTS: &'static str = "snapshots";
    const TRANSACTIONS: &'static str = "transactions";

    pub fn new(path: Path, store: Arc<dyn ObjectStore>) -> Self {
        Self { path, store }
    }

    pub fn path(&self) -> &Path {
        &self.path
    }

    pub fn store(&self) -> &Arc<dyn ObjectStore> {
        &self.store
    }

    pub async fn load_config(&self) -> crate::Result<EllaConfig> {
        let Snapshot { config, .. } = self.load_snapshot().await?;
        Ok(config)
    }

    pub async fn create(&self, config: EllaConfig) -> crate::Result<()> {
        self.write_snapshot(&Snapshot::empty(config)).await
    }

    pub async fn commit<T>(&self, tsn: T) -> crate::Result<()>
    where
        T: Into<Transaction>,
    {
        let tsn: Transaction = tsn.into();
        let path = tsn
            .uuid()
            .encode_path(&self.path.join(Self::TRANSACTIONS), Self::EXT);
        let raw = serde_json::to_vec(&tsn)?;
        self.store.put(&path.as_path(), raw.into()).await?;
        Ok(())
    }

    pub async fn create_snapshot(&self) -> crate::Result<()> {
        let transactions = self.load_transactions().await?;

        if transactions.is_empty() {
            return Ok(());
        }

        let mut snapshot = self
            .load_newest_snapshot()
            .await?
            .ok_or_else(|| crate::EngineError::InvalidDatastore(self.path.to_string()))?;

        snapshot.commit_many(transactions.clone())?;
        self.write_snapshot(&snapshot).await?;
        self.clear_transactions(transactions).await?;
        Ok(())
    }

    pub async fn load_snapshot(&self) -> crate::Result<Snapshot> {
        let mut snapshot = self
            .load_newest_snapshot()
            .await?
            .ok_or_else(|| crate::EngineError::InvalidDatastore(self.path.to_string()))?;
        tracing::debug!(uuid=%snapshot.uuid, "loaded snapshot");
        snapshot.commit_many(self.load_transactions().await?)?;
        Ok(snapshot)
    }

    async fn write_snapshot(&self, snapshot: &Snapshot) -> crate::Result<()> {
        tracing::info!(uuid=%snapshot.uuid, "saving catalog snapshot");

        let path = snapshot
            .uuid
            .encode_path(&self.path.join(Self::SNAPSHOTS), Self::EXT);
        let raw = serde_json::to_vec(&snapshot)?;
        self.store.put(&path.as_path(), raw.into()).await?;
        Ok(())
    }

    async fn load_transactions(&self) -> crate::Result<Vec<Transaction>> {
        let mut file_list = self
            .store
            .list(Some(&self.path.join(Self::TRANSACTIONS).as_path()))
            .await?
            .try_collect::<Vec<_>>()
            .await?;
        // Sort oldest to newest
        file_list.sort_unstable_by(|a, b| a.location.filename().cmp(&b.location.filename()));

        let mut transactions = Vec::with_capacity(file_list.len());
        for file in file_list {
            let raw = self.store.get(&file.location).await?.bytes().await?;
            let t = serde_json::from_slice(&raw)?;
            transactions.push(t);
        }

        Ok(transactions)
    }

    async fn load_newest_snapshot(&self) -> crate::Result<Option<Snapshot>> {
        let mut file_list = self
            .store
            .list(Some(&self.path.join(Self::SNAPSHOTS).as_path()))
            .await?
            .try_collect::<Vec<_>>()
            .await?;

        if file_list.is_empty() {
            return Ok(None);
        }
        // Sort newest to oldest (reverse order)
        let (_, first, _) = file_list
            .select_nth_unstable_by(0, |a, b| b.location.filename().cmp(&a.location.filename()));
        let raw = self.store.get(&first.location).await?.bytes().await?;
        Ok(Some(serde_json::from_slice(&raw)?))
    }

    async fn clear_transactions<I>(&self, transactions: I) -> crate::Result<()>
    where
        I: IntoIterator<Item = Transaction>,
    {
        for t in transactions {
            let path = t
                .uuid()
                .encode_path(&self.path.join(Self::TRANSACTIONS), Self::EXT);
            self.store.delete(&path.as_path()).await?;
        }
        Ok(())
    }
}