Skip to main content

arc_malachitebft_engine/
wal.rs

1use std::io;
2use std::marker::PhantomData;
3use std::path::PathBuf;
4
5use eyre::eyre;
6use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr};
7use tokio::sync::{mpsc, oneshot};
8use tracing::{debug, error, info, warn};
9
10use malachitebft_core_types::{Context, Height};
11use malachitebft_metrics::SharedRegistry;
12use malachitebft_wal as wal;
13
14mod entry;
15mod iter;
16mod thread;
17
18pub use entry::WalCodec;
19pub use entry::WalEntry;
20pub use iter::log_entries;
21
22pub type WalRef<Ctx> = ActorRef<Msg<Ctx>>;
23
24pub struct Wal<Ctx, Codec> {
25    span: tracing::Span,
26    _marker: PhantomData<(Ctx, Codec)>,
27}
28
29impl<Ctx, Codec> Wal<Ctx, Codec>
30where
31    Ctx: Context,
32    Codec: WalCodec<Ctx>,
33{
34    pub fn new(span: tracing::Span) -> Self {
35        Self {
36            span,
37            _marker: PhantomData,
38        }
39    }
40
41    pub async fn spawn(
42        _ctx: &Ctx,
43        codec: Codec,
44        path: PathBuf,
45        _metrics: SharedRegistry,
46        span: tracing::Span,
47    ) -> Result<WalRef<Ctx>, SpawnErr> {
48        let (actor_ref, _) = Actor::spawn(None, Self::new(span), Args { path, codec }).await?;
49        Ok(actor_ref)
50    }
51}
52
53pub type WalReply<T> = RpcReplyPort<eyre::Result<T>>;
54
55pub enum Msg<Ctx: Context> {
56    StartedHeight(Ctx::Height, WalReply<Vec<io::Result<WalEntry<Ctx>>>>),
57    Reset(Ctx::Height, WalReply<()>),
58    Append(Ctx::Height, WalEntry<Ctx>, WalReply<()>),
59    Flush(WalReply<()>),
60    Dump,
61}
62
63pub struct Args<Codec> {
64    pub path: PathBuf,
65    pub codec: Codec,
66}
67
68pub struct State<Ctx: Context> {
69    height: Ctx::Height,
70    wal_sender: mpsc::Sender<self::thread::WalMsg<Ctx>>,
71    _handle: std::thread::JoinHandle<()>,
72}
73
74impl<Ctx, Codec> Wal<Ctx, Codec>
75where
76    Ctx: Context,
77    Codec: WalCodec<Ctx>,
78{
79    async fn handle_msg(
80        &self,
81        _myself: WalRef<Ctx>,
82        msg: Msg<Ctx>,
83        state: &mut State<Ctx>,
84    ) -> Result<(), ActorProcessingErr> {
85        match msg {
86            Msg::StartedHeight(height, reply_to) => {
87                if state.height == height {
88                    debug!(%height, "WAL already at height, returning empty entries");
89                    reply_to
90                        .send(Ok(Vec::new()))
91                        .map_err(|e| eyre!("Failed to send reply: {e}"))?;
92                    return Ok(());
93                }
94
95                state.height = height;
96
97                self.started_height(state, height, reply_to).await?;
98            }
99
100            Msg::Reset(height, reply_to) => {
101                self.reset(state, height, reply_to).await?;
102            }
103
104            Msg::Append(height, entry, reply_to) => {
105                if height != state.height {
106                    warn!(
107                        wal.height = %state.height, entry.height = %height,
108                        "Ignoring append, mismatched height: {entry:?}"
109                    );
110
111                    reply_to
112                        .send(Ok(()))
113                        .map_err(|e| eyre!("Failed to send reply: {e}"))?;
114                } else {
115                    self.write_log(state, entry, reply_to).await?;
116                }
117            }
118
119            Msg::Flush(reply_to) => {
120                self.flush_log(state, reply_to).await?;
121            }
122
123            Msg::Dump => {
124                state.wal_sender.send(self::thread::WalMsg::Dump).await?;
125            }
126        }
127
128        Ok(())
129    }
130
131    async fn reset(
132        &self,
133        state: &mut State<Ctx>,
134        height: Ctx::Height,
135        reply_to: WalReply<()>,
136    ) -> Result<(), ActorProcessingErr> {
137        let (tx, rx) = oneshot::channel();
138
139        state
140            .wal_sender
141            .send(self::thread::WalMsg::Reset(height, tx))
142            .await?;
143
144        let result = rx.await?;
145
146        reply_to
147            .send(result)
148            .map_err(|e| eyre!("Failed to send reply: {e}"))?;
149
150        Ok(())
151    }
152
153    async fn started_height(
154        &self,
155        state: &mut State<Ctx>,
156        height: Ctx::Height,
157        reply_to: WalReply<Vec<io::Result<WalEntry<Ctx>>>>,
158    ) -> Result<(), ActorProcessingErr> {
159        let (tx, rx) = oneshot::channel();
160
161        state
162            .wal_sender
163            .send(self::thread::WalMsg::StartedHeight(height, tx))
164            .await?;
165
166        let to_replay = rx.await?;
167
168        reply_to
169            .send(to_replay)
170            .map_err(|e| eyre!("Failed to send reply: {e}"))?;
171
172        Ok(())
173    }
174
175    async fn write_log(
176        &self,
177        state: &mut State<Ctx>,
178        msg: impl Into<WalEntry<Ctx>>,
179        reply_to: WalReply<()>,
180    ) -> Result<(), ActorProcessingErr> {
181        let entry = msg.into();
182        let (tx, rx) = oneshot::channel();
183
184        state
185            .wal_sender
186            .send(self::thread::WalMsg::Append(entry, tx))
187            .await?;
188
189        let result = rx.await?;
190
191        reply_to
192            .send(result)
193            .map_err(|e| eyre!("Failed to send reply: {e}"))?;
194
195        Ok(())
196    }
197
198    async fn flush_log(
199        &self,
200        state: &mut State<Ctx>,
201        reply_to: WalReply<()>,
202    ) -> Result<(), ActorProcessingErr> {
203        let (tx, rx) = oneshot::channel();
204
205        state
206            .wal_sender
207            .send(self::thread::WalMsg::Flush(tx))
208            .await?;
209
210        let result = rx.await?;
211
212        reply_to
213            .send(result)
214            .map_err(|e| eyre!("Failed to send reply: {e}"))?;
215
216        Ok(())
217    }
218}
219
220#[async_trait]
221impl<Ctx, Codec> Actor for Wal<Ctx, Codec>
222where
223    Ctx: Context,
224    Codec: WalCodec<Ctx>,
225{
226    type Msg = Msg<Ctx>;
227    type Arguments = Args<Codec>;
228    type State = State<Ctx>;
229
230    #[tracing::instrument(
231        name = "wal.pre_start",
232        parent = &self.span,
233        skip_all,
234    )]
235    async fn pre_start(
236        &self,
237        _myself: WalRef<Ctx>,
238        args: Self::Arguments,
239    ) -> Result<Self::State, ActorProcessingErr> {
240        let log = wal::Log::open(&args.path)?;
241        info!("Opened WAL at {}", args.path.display());
242
243        let (tx, rx) = mpsc::channel(100);
244
245        // Spawn a system thread to perform blocking WAL operations.
246        let handle = self::thread::spawn(self.span.clone(), log, args.codec, rx);
247
248        Ok(State {
249            height: Ctx::Height::ZERO,
250            wal_sender: tx,
251            _handle: handle,
252        })
253    }
254
255    #[tracing::instrument(
256        name = "wal",
257        parent = &self.span,
258        skip_all,
259        fields(height = %span_height(state.height, &msg)),
260    )]
261    async fn handle(
262        &self,
263        myself: WalRef<Ctx>,
264        msg: Self::Msg,
265        state: &mut Self::State,
266    ) -> Result<(), ActorProcessingErr> {
267        if let Err(e) = self.handle_msg(myself, msg, state).await {
268            error!("Failed to handle WAL message: {e}");
269        }
270
271        Ok(())
272    }
273
274    #[tracing::instrument(
275        name = "wal.post_stop",
276        parent = &self.span,
277        skip_all,
278        fields(height = %state.height),
279    )]
280    async fn post_stop(
281        &self,
282        _: WalRef<Ctx>,
283        state: &mut Self::State,
284    ) -> Result<(), ActorProcessingErr> {
285        info!("Shutting down WAL");
286
287        let _ = state.wal_sender.send(self::thread::WalMsg::Shutdown).await;
288
289        Ok(())
290    }
291}
292
293/// Use the height we are about to start instead of the current height
294/// for the tracing span of the WAL actor when starting a new height.
295fn span_height<Ctx: Context>(height: Ctx::Height, msg: &Msg<Ctx>) -> Ctx::Height {
296    if let Msg::StartedHeight(h, _) = msg {
297        *h
298    } else {
299        height
300    }
301}