pearl/storage/
observer.rs1use super::prelude::*;
2use tokio::task::JoinHandle;
3use tokio::sync::mpsc::{channel, Sender};
4
5const OBSERVER_CHANNEL_SIZE_LIMIT: usize = 1024;
7
8#[derive(Debug, Clone)]
9pub(crate) enum OperationType {
10 CreateActiveBlob = 0,
11 CloseActiveBlob = 1,
12 RestoreActiveBlob = 2,
13 ForceUpdateActiveBlob = 3,
14 TryDumpBlobIndexes = 4,
15 TryUpdateActiveBlob = 5,
16 DeferredDumpBlobIndexes = 6,
17 TryFsyncData = 7,
18}
19
20#[derive(Debug)]
21pub struct ActiveBlobStat {
22 pub records_count: usize,
23 pub index_memory: usize,
24 pub file_size: usize,
25}
26
27impl ActiveBlobStat {
28 pub fn new(records_count: usize, index_memory: usize, file_size: usize) -> Self {
29 Self {
30 records_count,
31 index_memory,
32 file_size,
33 }
34 }
35}
36
37pub type ActiveBlobPred = fn(Option<ActiveBlobStat>) -> bool;
38
39#[derive(Debug)]
40pub(crate) struct Msg {
41 pub(crate) optype: OperationType,
42 pub(crate) predicate: Option<ActiveBlobPred>,
43}
44
45impl Msg {
46 pub(crate) fn new(optype: OperationType, predicate: Option<ActiveBlobPred>) -> Self {
47 Self { optype, predicate }
48 }
49}
50
51#[derive(Debug)]
52pub(crate) struct Observer<K>
53where
54 for<'a> K: Key<'a>,
55{
56 state: ObserverState<K>
57}
58
59#[derive(Debug)]
60enum ObserverState<K>
61where
62 for<'a> K: Key<'a>,
63{
64 Created(Arc<Inner<K>>),
65 Running(Sender<Msg>, JoinHandle<()>),
66 Stopped
67}
68
69impl<K> Observer<K>
70where
71 for<'a> K: Key<'a> + 'static,
72{
73 pub(crate) fn new(inner: Arc<Inner<K>>) -> Self {
74 Self {
75 state: ObserverState::Created(inner)
76 }
77 }
78
79 pub(crate) fn run(&mut self) {
80 if !matches!(&self.state, ObserverState::Created(_)) {
81 return;
82 }
83
84 let ObserverState::Created(inner) = std::mem::replace(&mut self.state, ObserverState::Stopped) else {
85 unreachable!("State should be ObserverState::Created. It was checked at the beggining");
86 };
87
88 let (sender, receiver) = channel(OBSERVER_CHANNEL_SIZE_LIMIT);
89 let worker = ObserverWorker::new(
90 receiver,
91 inner
92 );
93 let handle = tokio::spawn(worker.run());
94
95 self.state = ObserverState::Running(sender, handle);
96 }
97
98 pub(crate) async fn shutdown(mut self) {
99 if let ObserverState::Running(sender, handle) = self.state {
100 std::mem::drop(sender); if let Err(err) = handle.await {
103 error!("Unexpected JoinError in Observer: {:?}", err);
104 }
105 }
106
107 self.state = ObserverState::Stopped;
108 }
109
110 pub(crate) async fn force_update_active_blob(&self, predicate: ActiveBlobPred) {
111 self.send_msg(Msg::new(
112 OperationType::ForceUpdateActiveBlob,
113 Some(predicate),
114 ))
115 .await
116 }
117
118 pub(crate) async fn restore_active_blob(&self) {
119 self.send_msg(Msg::new(OperationType::RestoreActiveBlob, None))
120 .await
121 }
122
123 pub(crate) async fn close_active_blob(&self) {
124 self.send_msg(Msg::new(OperationType::CloseActiveBlob, None))
125 .await
126 }
127
128 pub(crate) async fn create_active_blob(&self) {
129 self.send_msg(Msg::new(OperationType::CreateActiveBlob, None))
130 .await
131 }
132
133 pub(crate) async fn try_dump_old_blob_indexes(&self) {
134 self.send_msg(Msg::new(OperationType::TryDumpBlobIndexes, None))
135 .await
136 }
137
138 pub(crate) async fn defer_dump_old_blob_indexes(&self) {
139 self.send_msg(Msg::new(OperationType::DeferredDumpBlobIndexes, None))
140 .await
141 }
142
143 pub(crate) async fn try_update_active_blob(&self) {
144 self.send_msg(Msg::new(OperationType::TryUpdateActiveBlob, None))
145 .await
146 }
147
148 pub(crate) async fn try_fsync_data(&self) {
149 self.send_msg(Msg::new(OperationType::TryFsyncData, None))
150 .await
151 }
152
153 async fn send_msg(&self, msg: Msg) {
154 if let ObserverState::Running(sender, _) = &self.state {
155 let optype = msg.optype.clone();
156 if let Err(e) = sender.send(msg).await {
157 error!(
158 "Can't send message to worker:\nOperation: {:?}\nReason: {:?}",
159 optype, e
160 );
161 }
162 } else {
163 error!("storage observer task was not launched");
164 }
165 }
166}