1use std::io;
2use std::marker::PhantomData;
3use std::path::PathBuf;
4
5use eyre::eyre;
6use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr};
7use tokio::sync::{mpsc, oneshot};
8use tracing::{debug, error, info, warn};
9
10use malachitebft_core_types::{Context, Height};
11use malachitebft_metrics::SharedRegistry;
12use malachitebft_wal as wal;
13
14mod entry;
15mod iter;
16mod thread;
17
18pub use entry::WalCodec;
19pub use entry::WalEntry;
20pub use iter::log_entries;
21
22pub type WalRef<Ctx> = ActorRef<Msg<Ctx>>;
23
24pub struct Wal<Ctx, Codec> {
25 span: tracing::Span,
26 _marker: PhantomData<(Ctx, Codec)>,
27}
28
29impl<Ctx, Codec> Wal<Ctx, Codec>
30where
31 Ctx: Context,
32 Codec: WalCodec<Ctx>,
33{
34 pub fn new(span: tracing::Span) -> Self {
35 Self {
36 span,
37 _marker: PhantomData,
38 }
39 }
40
41 pub async fn spawn(
42 _ctx: &Ctx,
43 codec: Codec,
44 path: PathBuf,
45 _metrics: SharedRegistry,
46 span: tracing::Span,
47 ) -> Result<WalRef<Ctx>, SpawnErr> {
48 let (actor_ref, _) = Actor::spawn(None, Self::new(span), Args { path, codec }).await?;
49 Ok(actor_ref)
50 }
51}
52
53pub type WalReply<T> = RpcReplyPort<eyre::Result<T>>;
54
55pub enum Msg<Ctx: Context> {
56 StartedHeight(Ctx::Height, WalReply<Vec<io::Result<WalEntry<Ctx>>>>),
57 Reset(Ctx::Height, WalReply<()>),
58 Append(Ctx::Height, WalEntry<Ctx>, WalReply<()>),
59 Flush(WalReply<()>),
60 Dump,
61}
62
63pub struct Args<Codec> {
64 pub path: PathBuf,
65 pub codec: Codec,
66}
67
68pub struct State<Ctx: Context> {
69 height: Ctx::Height,
70 wal_sender: mpsc::Sender<self::thread::WalMsg<Ctx>>,
71 _handle: std::thread::JoinHandle<()>,
72}
73
74impl<Ctx, Codec> Wal<Ctx, Codec>
75where
76 Ctx: Context,
77 Codec: WalCodec<Ctx>,
78{
79 async fn handle_msg(
80 &self,
81 _myself: WalRef<Ctx>,
82 msg: Msg<Ctx>,
83 state: &mut State<Ctx>,
84 ) -> Result<(), ActorProcessingErr> {
85 match msg {
86 Msg::StartedHeight(height, reply_to) => {
87 if state.height == height {
88 debug!(%height, "WAL already at height, returning empty entries");
89 reply_to
90 .send(Ok(Vec::new()))
91 .map_err(|e| eyre!("Failed to send reply: {e}"))?;
92 return Ok(());
93 }
94
95 state.height = height;
96
97 self.started_height(state, height, reply_to).await?;
98 }
99
100 Msg::Reset(height, reply_to) => {
101 self.reset(state, height, reply_to).await?;
102 }
103
104 Msg::Append(height, entry, reply_to) => {
105 if height != state.height {
106 warn!(
107 wal.height = %state.height, entry.height = %height,
108 "Ignoring append, mismatched height: {entry:?}"
109 );
110
111 reply_to
112 .send(Ok(()))
113 .map_err(|e| eyre!("Failed to send reply: {e}"))?;
114 } else {
115 self.write_log(state, entry, reply_to).await?;
116 }
117 }
118
119 Msg::Flush(reply_to) => {
120 self.flush_log(state, reply_to).await?;
121 }
122
123 Msg::Dump => {
124 state.wal_sender.send(self::thread::WalMsg::Dump).await?;
125 }
126 }
127
128 Ok(())
129 }
130
131 async fn reset(
132 &self,
133 state: &mut State<Ctx>,
134 height: Ctx::Height,
135 reply_to: WalReply<()>,
136 ) -> Result<(), ActorProcessingErr> {
137 let (tx, rx) = oneshot::channel();
138
139 state
140 .wal_sender
141 .send(self::thread::WalMsg::Reset(height, tx))
142 .await?;
143
144 let result = rx.await?;
145
146 reply_to
147 .send(result)
148 .map_err(|e| eyre!("Failed to send reply: {e}"))?;
149
150 Ok(())
151 }
152
153 async fn started_height(
154 &self,
155 state: &mut State<Ctx>,
156 height: Ctx::Height,
157 reply_to: WalReply<Vec<io::Result<WalEntry<Ctx>>>>,
158 ) -> Result<(), ActorProcessingErr> {
159 let (tx, rx) = oneshot::channel();
160
161 state
162 .wal_sender
163 .send(self::thread::WalMsg::StartedHeight(height, tx))
164 .await?;
165
166 let to_replay = rx.await?;
167
168 reply_to
169 .send(to_replay)
170 .map_err(|e| eyre!("Failed to send reply: {e}"))?;
171
172 Ok(())
173 }
174
175 async fn write_log(
176 &self,
177 state: &mut State<Ctx>,
178 msg: impl Into<WalEntry<Ctx>>,
179 reply_to: WalReply<()>,
180 ) -> Result<(), ActorProcessingErr> {
181 let entry = msg.into();
182 let (tx, rx) = oneshot::channel();
183
184 state
185 .wal_sender
186 .send(self::thread::WalMsg::Append(entry, tx))
187 .await?;
188
189 let result = rx.await?;
190
191 reply_to
192 .send(result)
193 .map_err(|e| eyre!("Failed to send reply: {e}"))?;
194
195 Ok(())
196 }
197
198 async fn flush_log(
199 &self,
200 state: &mut State<Ctx>,
201 reply_to: WalReply<()>,
202 ) -> Result<(), ActorProcessingErr> {
203 let (tx, rx) = oneshot::channel();
204
205 state
206 .wal_sender
207 .send(self::thread::WalMsg::Flush(tx))
208 .await?;
209
210 let result = rx.await?;
211
212 reply_to
213 .send(result)
214 .map_err(|e| eyre!("Failed to send reply: {e}"))?;
215
216 Ok(())
217 }
218}
219
220#[async_trait]
221impl<Ctx, Codec> Actor for Wal<Ctx, Codec>
222where
223 Ctx: Context,
224 Codec: WalCodec<Ctx>,
225{
226 type Msg = Msg<Ctx>;
227 type Arguments = Args<Codec>;
228 type State = State<Ctx>;
229
230 #[tracing::instrument(
231 name = "wal.pre_start",
232 parent = &self.span,
233 skip_all,
234 )]
235 async fn pre_start(
236 &self,
237 _myself: WalRef<Ctx>,
238 args: Self::Arguments,
239 ) -> Result<Self::State, ActorProcessingErr> {
240 let log = wal::Log::open(&args.path)?;
241 info!("Opened WAL at {}", args.path.display());
242
243 let (tx, rx) = mpsc::channel(100);
244
245 let handle = self::thread::spawn(self.span.clone(), log, args.codec, rx);
247
248 Ok(State {
249 height: Ctx::Height::ZERO,
250 wal_sender: tx,
251 _handle: handle,
252 })
253 }
254
255 #[tracing::instrument(
256 name = "wal",
257 parent = &self.span,
258 skip_all,
259 fields(height = %span_height(state.height, &msg)),
260 )]
261 async fn handle(
262 &self,
263 myself: WalRef<Ctx>,
264 msg: Self::Msg,
265 state: &mut Self::State,
266 ) -> Result<(), ActorProcessingErr> {
267 if let Err(e) = self.handle_msg(myself, msg, state).await {
268 error!("Failed to handle WAL message: {e}");
269 }
270
271 Ok(())
272 }
273
274 #[tracing::instrument(
275 name = "wal.post_stop",
276 parent = &self.span,
277 skip_all,
278 fields(height = %state.height),
279 )]
280 async fn post_stop(
281 &self,
282 _: WalRef<Ctx>,
283 state: &mut Self::State,
284 ) -> Result<(), ActorProcessingErr> {
285 info!("Shutting down WAL");
286
287 let _ = state.wal_sender.send(self::thread::WalMsg::Shutdown).await;
288
289 Ok(())
290 }
291}
292
293fn span_height<Ctx: Context>(height: Ctx::Height, msg: &Msg<Ctx>) -> Ctx::Height {
296 if let Msg::StartedHeight(h, _) = msg {
297 *h
298 } else {
299 height
300 }
301}