pearl/storage/
observer.rs

1use super::prelude::*;
2use tokio::task::JoinHandle;
3use tokio::sync::mpsc::{channel, Sender};
4
5/// Max size of the channel between `Observer` and `ObserverWorker`
6const OBSERVER_CHANNEL_SIZE_LIMIT: usize = 1024;
7
8#[derive(Debug, Clone)]
9pub(crate) enum OperationType {
10    CreateActiveBlob = 0,
11    CloseActiveBlob = 1,
12    RestoreActiveBlob = 2,
13    ForceUpdateActiveBlob = 3,
14    TryDumpBlobIndexes = 4,
15    TryUpdateActiveBlob = 5,
16    DeferredDumpBlobIndexes = 6,
17    TryFsyncData = 7,
18}
19
20#[derive(Debug)]
21pub struct ActiveBlobStat {
22    pub records_count: usize,
23    pub index_memory: usize,
24    pub file_size: usize,
25}
26
27impl ActiveBlobStat {
28    pub fn new(records_count: usize, index_memory: usize, file_size: usize) -> Self {
29        Self {
30            records_count,
31            index_memory,
32            file_size,
33        }
34    }
35}
36
37pub type ActiveBlobPred = fn(Option<ActiveBlobStat>) -> bool;
38
39#[derive(Debug)]
40pub(crate) struct Msg {
41    pub(crate) optype: OperationType,
42    pub(crate) predicate: Option<ActiveBlobPred>,
43}
44
45impl Msg {
46    pub(crate) fn new(optype: OperationType, predicate: Option<ActiveBlobPred>) -> Self {
47        Self { optype, predicate }
48    }
49}
50
51#[derive(Debug)]
52pub(crate) struct Observer<K>
53where
54    for<'a> K: Key<'a>,
55{
56    state: ObserverState<K>
57}
58
59#[derive(Debug)]
60enum ObserverState<K>
61where
62    for<'a> K: Key<'a>,
63{
64    Created(Arc<Inner<K>>),
65    Running(Sender<Msg>, JoinHandle<()>),
66    Stopped
67}
68
69impl<K> Observer<K>
70where
71    for<'a> K: Key<'a> + 'static,
72{
73    pub(crate) fn new(inner: Arc<Inner<K>>) -> Self {
74        Self {
75            state: ObserverState::Created(inner)
76        }
77    }
78
79    pub(crate) fn run(&mut self) {
80        if !matches!(&self.state, ObserverState::Created(_)) {
81            return;
82        }
83
84        let ObserverState::Created(inner) = std::mem::replace(&mut self.state, ObserverState::Stopped) else {
85            unreachable!("State should be ObserverState::Created. It was checked at the beggining");
86        };
87
88        let (sender, receiver) = channel(OBSERVER_CHANNEL_SIZE_LIMIT);  
89        let worker = ObserverWorker::new(
90            receiver,
91            inner
92        );
93        let handle = tokio::spawn(worker.run());
94
95        self.state = ObserverState::Running(sender, handle);
96    }
97
98    pub(crate) async fn shutdown(mut self) {
99        if let ObserverState::Running(sender, handle) = self.state {
100            std::mem::drop(sender); // Drop sender. That trigger ObserverWorker stopping
101            // Wait for completion
102            if let Err(err) = handle.await {
103                error!("Unexpected JoinError in Observer: {:?}", err);
104            }
105        }
106
107        self.state = ObserverState::Stopped;
108    }
109
110    pub(crate) async fn force_update_active_blob(&self, predicate: ActiveBlobPred) {
111        self.send_msg(Msg::new(
112            OperationType::ForceUpdateActiveBlob,
113            Some(predicate),
114        ))
115        .await
116    }
117
118    pub(crate) async fn restore_active_blob(&self) {
119        self.send_msg(Msg::new(OperationType::RestoreActiveBlob, None))
120            .await
121    }
122
123    pub(crate) async fn close_active_blob(&self) {
124        self.send_msg(Msg::new(OperationType::CloseActiveBlob, None))
125            .await
126    }
127
128    pub(crate) async fn create_active_blob(&self) {
129        self.send_msg(Msg::new(OperationType::CreateActiveBlob, None))
130            .await
131    }
132
133    pub(crate) async fn try_dump_old_blob_indexes(&self) {
134        self.send_msg(Msg::new(OperationType::TryDumpBlobIndexes, None))
135            .await
136    }
137
138    pub(crate) async fn defer_dump_old_blob_indexes(&self) {
139        self.send_msg(Msg::new(OperationType::DeferredDumpBlobIndexes, None))
140            .await
141    }
142
143    pub(crate) async fn try_update_active_blob(&self) {
144        self.send_msg(Msg::new(OperationType::TryUpdateActiveBlob, None))
145            .await
146    }
147
148    pub(crate) async fn try_fsync_data(&self) {
149        self.send_msg(Msg::new(OperationType::TryFsyncData, None))
150            .await
151    }
152
153    async fn send_msg(&self, msg: Msg) {
154        if let ObserverState::Running(sender, _) = &self.state {
155            let optype = msg.optype.clone();
156            if let Err(e) = sender.send(msg).await {
157                error!(
158                    "Can't send message to worker:\nOperation: {:?}\nReason: {:?}",
159                    optype, e
160                );
161            }
162        } else {
163            error!("storage observer task was not launched");
164        }
165    }
166}