Skip to main content

ursula_runtime/
engine_wal.rs

1use std::fs::{self, File, OpenOptions};
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::time::Instant;
5
6use serde::{Deserialize, Serialize};
7use ursula_shard::{BucketStreamId, ShardPlacement};
8use ursula_stream::{StreamCommand, StreamErrorCode, StreamSnapshot};
9
10use crate::cold_store::ColdStoreHandle;
11use crate::command::{GroupSnapshot, GroupWriteCommand};
12use crate::engine::{
13    GroupAppendBatchFuture, GroupAppendFuture, GroupBootstrapStreamFuture, GroupCloseStreamFuture,
14    GroupColdHotBacklogFuture, GroupCreateStreamFuture, GroupDeleteSnapshotFuture,
15    GroupDeleteStreamFuture, GroupEngine, GroupEngineCreateFuture, GroupEngineError,
16    GroupEngineFactory, GroupEngineMetrics, GroupFlushColdFuture, GroupForkRefFuture,
17    GroupHeadStreamFuture, GroupInstallSnapshotFuture, GroupPlanColdFlushFuture,
18    GroupPlanNextColdFlushBatchFuture, GroupPlanNextColdFlushFuture, GroupPublishSnapshotFuture,
19    GroupReadSnapshotFuture, GroupReadStreamFuture, GroupSnapshotFuture,
20    GroupTouchStreamAccessFuture, GroupWriteResponse,
21};
22use crate::engine_in_memory::InMemoryGroupEngine;
23use crate::metrics::elapsed_ns;
24use crate::request::{
25    AppendBatchRequest, AppendRequest, BootstrapStreamRequest, CloseStreamRequest,
26    ColdWriteAdmission, CreateStreamRequest, DeleteSnapshotRequest, DeleteStreamRequest,
27    FlushColdRequest, HeadStreamRequest, PlanColdFlushRequest, PlanGroupColdFlushRequest,
28    PublishSnapshotRequest, ReadSnapshotRequest, ReadStreamRequest, StreamAppendCount,
29    TouchStreamAccessResponse,
30};
31
32#[derive(Debug, Clone)]
33pub struct WalGroupEngineFactory {
34    root: PathBuf,
35    cold_store: Option<ColdStoreHandle>,
36}
37
38impl WalGroupEngineFactory {
39    pub fn new(root: impl Into<PathBuf>) -> Self {
40        Self {
41            root: root.into(),
42            cold_store: None,
43        }
44    }
45
46    pub fn with_cold_store(root: impl Into<PathBuf>, cold_store: Option<ColdStoreHandle>) -> Self {
47        Self {
48            root: root.into(),
49            cold_store,
50        }
51    }
52}
53
54impl GroupEngineFactory for WalGroupEngineFactory {
55    fn create<'a>(
56        &'a self,
57        placement: ShardPlacement,
58        metrics: GroupEngineMetrics,
59    ) -> GroupEngineCreateFuture<'a> {
60        Box::pin(async move {
61            let engine: Box<dyn GroupEngine> = Box::new(WalGroupEngine::open(
62                &self.root,
63                placement,
64                metrics,
65                self.cold_store.clone(),
66            ));
67            Ok(engine)
68        })
69    }
70}
71
72pub struct WalGroupEngine {
73    inner: InMemoryGroupEngine,
74    log_path: PathBuf,
75    placement: ShardPlacement,
76    metrics: GroupEngineMetrics,
77    init_error: Option<String>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(tag = "wal_record", rename_all = "snake_case")]
82enum WalRecord {
83    Command {
84        command: Box<GroupWriteCommand>,
85    },
86    Snapshot {
87        group_commit_index: u64,
88        stream_snapshot: StreamSnapshot,
89        stream_append_counts: Vec<StreamAppendCount>,
90    },
91}
92
93impl WalGroupEngine {
94    fn open(
95        root: &Path,
96        placement: ShardPlacement,
97        metrics: GroupEngineMetrics,
98        cold_store: Option<ColdStoreHandle>,
99    ) -> Self {
100        let log_path = group_log_path(root, placement);
101        match replay_group_log(&log_path) {
102            Ok(mut inner) => {
103                inner.cold_store = cold_store;
104                Self {
105                    inner,
106                    log_path,
107                    placement,
108                    metrics,
109                    init_error: None,
110                }
111            }
112            Err(err) => Self {
113                inner: InMemoryGroupEngine {
114                    cold_store,
115                    ..InMemoryGroupEngine::default()
116                },
117                log_path,
118                placement,
119                metrics,
120                init_error: Some(err.message().to_owned()),
121            },
122        }
123    }
124
125    fn ensure_ready(&self) -> Result<(), GroupEngineError> {
126        match &self.init_error {
127            Some(message) => Err(GroupEngineError::new(message.clone())),
128            None => Ok(()),
129        }
130    }
131
132    fn append_record(&self, command: &GroupWriteCommand) -> Result<(), GroupEngineError> {
133        self.append_records(std::slice::from_ref(command))
134    }
135
136    fn append_records(&self, commands: &[GroupWriteCommand]) -> Result<(), GroupEngineError> {
137        if commands.is_empty() {
138            return Ok(());
139        }
140        let Some(parent) = self.log_path.parent() else {
141            return Err(GroupEngineError::new(format!(
142                "WAL path '{}' has no parent directory",
143                self.log_path.display()
144            )));
145        };
146        fs::create_dir_all(parent).map_err(|err| {
147            GroupEngineError::new(format!("create WAL dir '{}': {err}", parent.display()))
148        })?;
149        let write_started_at = Instant::now();
150        let mut file = OpenOptions::new()
151            .create(true)
152            .append(true)
153            .open(&self.log_path)
154            .map_err(|err| {
155                GroupEngineError::new(format!("open WAL '{}': {err}", self.log_path.display()))
156            })?;
157        for command in commands {
158            let record = WalRecord::Command {
159                command: Box::new(command.clone()),
160            };
161            serde_json::to_writer(&mut file, &record).map_err(|err| {
162                GroupEngineError::new(format!("encode WAL '{}': {err}", self.log_path.display()))
163            })?;
164            file.write_all(b"\n").map_err(|err| {
165                GroupEngineError::new(format!("write WAL '{}': {err}", self.log_path.display()))
166            })?;
167        }
168        let write_ns = elapsed_ns(write_started_at);
169        let sync_started_at = Instant::now();
170        file.sync_data().map_err(|err| {
171            GroupEngineError::new(format!("sync WAL '{}': {err}", self.log_path.display()))
172        })?;
173        self.metrics.record_wal_batch(
174            self.placement,
175            commands.len(),
176            write_ns,
177            elapsed_ns(sync_started_at),
178        );
179        Ok(())
180    }
181
182    fn append_snapshot_record(&self, snapshot: &GroupSnapshot) -> Result<(), GroupEngineError> {
183        let record = WalRecord::Snapshot {
184            group_commit_index: snapshot.group_commit_index,
185            stream_snapshot: snapshot.stream_snapshot.clone(),
186            stream_append_counts: snapshot.stream_append_counts.clone(),
187        };
188        let Some(parent) = self.log_path.parent() else {
189            return Err(GroupEngineError::new(format!(
190                "WAL path '{}' has no parent directory",
191                self.log_path.display()
192            )));
193        };
194        fs::create_dir_all(parent).map_err(|err| {
195            GroupEngineError::new(format!("create WAL dir '{}': {err}", parent.display()))
196        })?;
197        let write_started_at = Instant::now();
198        let mut file = OpenOptions::new()
199            .create(true)
200            .append(true)
201            .open(&self.log_path)
202            .map_err(|err| {
203                GroupEngineError::new(format!("open WAL '{}': {err}", self.log_path.display()))
204            })?;
205        serde_json::to_writer(&mut file, &record).map_err(|err| {
206            GroupEngineError::new(format!("encode WAL '{}': {err}", self.log_path.display()))
207        })?;
208        file.write_all(b"\n").map_err(|err| {
209            GroupEngineError::new(format!("write WAL '{}': {err}", self.log_path.display()))
210        })?;
211        let write_ns = elapsed_ns(write_started_at);
212        let sync_started_at = Instant::now();
213        file.sync_data().map_err(|err| {
214            GroupEngineError::new(format!("sync WAL '{}': {err}", self.log_path.display()))
215        })?;
216        self.metrics
217            .record_wal_batch(self.placement, 1, write_ns, elapsed_ns(sync_started_at));
218        Ok(())
219    }
220
221    fn commit_access_if_needed(
222        &mut self,
223        stream_id: &BucketStreamId,
224        now_ms: u64,
225        renew_ttl: bool,
226        placement: ShardPlacement,
227    ) -> Result<Option<TouchStreamAccessResponse>, GroupEngineError> {
228        if !self
229            .inner
230            .access_requires_write(stream_id, now_ms, renew_ttl)?
231        {
232            return Ok(None);
233        }
234        let command = GroupWriteCommand::TouchStreamAccess {
235            stream_id: stream_id.clone(),
236            now_ms,
237            renew_ttl,
238        };
239        let mut preview = self.inner.clone();
240        let response = match preview.apply_committed_write(command.clone(), placement)? {
241            GroupWriteResponse::TouchStreamAccess(response) => response,
242            other => {
243                return Err(GroupEngineError::new(format!(
244                    "unexpected touch stream access write response: {other:?}"
245                )));
246            }
247        };
248        if response.changed || response.expired {
249            self.append_record(&command)?;
250        }
251        self.inner = preview;
252        if response.expired {
253            return Err(GroupEngineError::stream(
254                StreamErrorCode::StreamNotFound,
255                format!("stream '{stream_id}' does not exist"),
256            ));
257        }
258        Ok(Some(response))
259    }
260}
261
262impl GroupEngine for WalGroupEngine {
263    fn create_stream<'a>(
264        &'a mut self,
265        request: CreateStreamRequest,
266        placement: ShardPlacement,
267    ) -> GroupCreateStreamFuture<'a> {
268        Box::pin(async move {
269            self.ensure_ready()?;
270            let command = GroupWriteCommand::from(request);
271            let mut preview = self.inner.clone();
272            let response = match preview.apply_committed_write(command.clone(), placement)? {
273                GroupWriteResponse::CreateStream(response) => response,
274                other => {
275                    return Err(GroupEngineError::new(format!(
276                        "unexpected create stream write response: {other:?}"
277                    )));
278                }
279            };
280            if !response.already_exists {
281                self.append_record(&command)?;
282            }
283            self.inner = preview;
284            Ok(response)
285        })
286    }
287
288    fn create_stream_with_cold_admission<'a>(
289        &'a mut self,
290        request: CreateStreamRequest,
291        placement: ShardPlacement,
292        admission: ColdWriteAdmission,
293    ) -> GroupCreateStreamFuture<'a> {
294        if !admission.is_enabled() {
295            return self.create_stream(request, placement);
296        }
297        Box::pin(async move {
298            self.ensure_ready()?;
299            let command = GroupWriteCommand::from(request.clone());
300            let mut preview = self.inner.clone();
301            let response =
302                preview.create_stream_with_admission_inner(request, placement, admission)?;
303            if !response.already_exists {
304                self.append_record(&command)?;
305            }
306            self.inner = preview;
307            Ok(response)
308        })
309    }
310
311    fn head_stream<'a>(
312        &'a mut self,
313        request: HeadStreamRequest,
314        placement: ShardPlacement,
315    ) -> GroupHeadStreamFuture<'a> {
316        Box::pin(async move {
317            self.ensure_ready()?;
318            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
319            self.inner.head_stream(request, placement).await
320        })
321    }
322
323    fn read_stream<'a>(
324        &'a mut self,
325        request: ReadStreamRequest,
326        placement: ShardPlacement,
327    ) -> GroupReadStreamFuture<'a> {
328        Box::pin(async move {
329            self.ensure_ready()?;
330            self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
331            self.inner.read_stream(request, placement).await
332        })
333    }
334
335    fn publish_snapshot<'a>(
336        &'a mut self,
337        request: PublishSnapshotRequest,
338        placement: ShardPlacement,
339    ) -> GroupPublishSnapshotFuture<'a> {
340        Box::pin(async move {
341            self.ensure_ready()?;
342            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
343            let command = GroupWriteCommand::from(request);
344            let mut preview = self.inner.clone();
345            let response = match preview.apply_committed_write(command.clone(), placement)? {
346                GroupWriteResponse::PublishSnapshot(response) => response,
347                other => {
348                    return Err(GroupEngineError::new(format!(
349                        "unexpected publish snapshot write response: {other:?}"
350                    )));
351                }
352            };
353            self.append_record(&command)?;
354            self.inner = preview;
355            Ok(response)
356        })
357    }
358
359    fn read_snapshot<'a>(
360        &'a mut self,
361        request: ReadSnapshotRequest,
362        placement: ShardPlacement,
363    ) -> GroupReadSnapshotFuture<'a> {
364        Box::pin(async move {
365            self.ensure_ready()?;
366            self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
367            self.inner.read_snapshot(request, placement).await
368        })
369    }
370
371    fn delete_snapshot<'a>(
372        &'a mut self,
373        request: DeleteSnapshotRequest,
374        placement: ShardPlacement,
375    ) -> GroupDeleteSnapshotFuture<'a> {
376        Box::pin(async move {
377            self.ensure_ready()?;
378            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
379            self.inner.delete_snapshot(request, placement).await
380        })
381    }
382
383    fn bootstrap_stream<'a>(
384        &'a mut self,
385        request: BootstrapStreamRequest,
386        placement: ShardPlacement,
387    ) -> GroupBootstrapStreamFuture<'a> {
388        Box::pin(async move {
389            self.ensure_ready()?;
390            self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
391            self.inner.bootstrap_stream(request, placement).await
392        })
393    }
394
395    fn touch_stream_access<'a>(
396        &'a mut self,
397        stream_id: BucketStreamId,
398        now_ms: u64,
399        renew_ttl: bool,
400        placement: ShardPlacement,
401    ) -> GroupTouchStreamAccessFuture<'a> {
402        Box::pin(async move {
403            self.ensure_ready()?;
404            let command = GroupWriteCommand::TouchStreamAccess {
405                stream_id,
406                now_ms,
407                renew_ttl,
408            };
409            let mut preview = self.inner.clone();
410            let response = match preview.apply_committed_write(command.clone(), placement)? {
411                GroupWriteResponse::TouchStreamAccess(response) => response,
412                other => {
413                    return Err(GroupEngineError::new(format!(
414                        "unexpected touch stream access write response: {other:?}"
415                    )));
416                }
417            };
418            if response.changed || response.expired {
419                self.append_record(&command)?;
420            }
421            self.inner = preview;
422            Ok(response)
423        })
424    }
425
426    fn add_fork_ref<'a>(
427        &'a mut self,
428        stream_id: BucketStreamId,
429        now_ms: u64,
430        placement: ShardPlacement,
431    ) -> GroupForkRefFuture<'a> {
432        Box::pin(async move {
433            self.ensure_ready()?;
434            let command = GroupWriteCommand::AddForkRef { stream_id, now_ms };
435            let mut preview = self.inner.clone();
436            let response = match preview.apply_committed_write(command.clone(), placement)? {
437                GroupWriteResponse::AddForkRef(response) => response,
438                other => {
439                    return Err(GroupEngineError::new(format!(
440                        "unexpected add fork ref write response: {other:?}"
441                    )));
442                }
443            };
444            self.append_record(&command)?;
445            self.inner = preview;
446            Ok(response)
447        })
448    }
449
450    fn release_fork_ref<'a>(
451        &'a mut self,
452        stream_id: BucketStreamId,
453        placement: ShardPlacement,
454    ) -> GroupForkRefFuture<'a> {
455        Box::pin(async move {
456            self.ensure_ready()?;
457            let command = GroupWriteCommand::ReleaseForkRef { stream_id };
458            let mut preview = self.inner.clone();
459            let response = match preview.apply_committed_write(command.clone(), placement)? {
460                GroupWriteResponse::ReleaseForkRef(response) => response,
461                other => {
462                    return Err(GroupEngineError::new(format!(
463                        "unexpected release fork ref write response: {other:?}"
464                    )));
465                }
466            };
467            self.append_record(&command)?;
468            self.inner = preview;
469            Ok(response)
470        })
471    }
472
473    fn close_stream<'a>(
474        &'a mut self,
475        request: CloseStreamRequest,
476        placement: ShardPlacement,
477    ) -> GroupCloseStreamFuture<'a> {
478        Box::pin(async move {
479            self.ensure_ready()?;
480            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
481            let command = GroupWriteCommand::from(request);
482            let mut preview = self.inner.clone();
483            let response = match preview.apply_committed_write(command.clone(), placement)? {
484                GroupWriteResponse::CloseStream(response) => response,
485                other => {
486                    return Err(GroupEngineError::new(format!(
487                        "unexpected close stream write response: {other:?}"
488                    )));
489                }
490            };
491            self.append_record(&command)?;
492            self.inner = preview;
493            Ok(response)
494        })
495    }
496
497    fn delete_stream<'a>(
498        &'a mut self,
499        request: DeleteStreamRequest,
500        placement: ShardPlacement,
501    ) -> GroupDeleteStreamFuture<'a> {
502        Box::pin(async move {
503            self.ensure_ready()?;
504            let command = GroupWriteCommand::from(request);
505            let mut preview = self.inner.clone();
506            let response = match preview.apply_committed_write(command.clone(), placement)? {
507                GroupWriteResponse::DeleteStream(response) => response,
508                other => {
509                    return Err(GroupEngineError::new(format!(
510                        "unexpected delete stream write response: {other:?}"
511                    )));
512                }
513            };
514            self.append_record(&command)?;
515            self.inner = preview;
516            Ok(response)
517        })
518    }
519
520    fn append<'a>(
521        &'a mut self,
522        request: AppendRequest,
523        placement: ShardPlacement,
524    ) -> GroupAppendFuture<'a> {
525        Box::pin(async move {
526            self.ensure_ready()?;
527            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
528            let command = GroupWriteCommand::from(request);
529            let mut preview = self.inner.clone();
530            let response = match preview.apply_committed_write(command.clone(), placement)? {
531                GroupWriteResponse::Append(response) => response,
532                other => {
533                    return Err(GroupEngineError::new(format!(
534                        "unexpected append write response: {other:?}"
535                    )));
536                }
537            };
538            self.append_record(&command)?;
539            self.inner = preview;
540            Ok(response)
541        })
542    }
543
544    fn append_with_cold_admission<'a>(
545        &'a mut self,
546        request: AppendRequest,
547        placement: ShardPlacement,
548        admission: ColdWriteAdmission,
549    ) -> GroupAppendFuture<'a> {
550        if !admission.is_enabled() {
551            return self.append(request, placement);
552        }
553        Box::pin(async move {
554            self.ensure_ready()?;
555            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
556            let command = GroupWriteCommand::from(request.clone());
557            let mut preview = self.inner.clone();
558            let response = preview.append_with_admission_inner(request, placement, admission)?;
559            if !response.deduplicated {
560                self.append_record(&command)?;
561            }
562            self.inner = preview;
563            Ok(response)
564        })
565    }
566
567    fn append_batch<'a>(
568        &'a mut self,
569        request: AppendBatchRequest,
570        placement: ShardPlacement,
571    ) -> GroupAppendBatchFuture<'a> {
572        Box::pin(async move {
573            self.ensure_ready()?;
574            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
575            let command = GroupWriteCommand::from(request);
576            let mut preview = self.inner.clone();
577            let response = match preview.apply_committed_write(command.clone(), placement)? {
578                GroupWriteResponse::AppendBatch(response) => response,
579                other => {
580                    return Err(GroupEngineError::new(format!(
581                        "unexpected append batch write response: {other:?}"
582                    )));
583                }
584            };
585            if response
586                .items
587                .iter()
588                .any(|item| matches!(item, Ok(response) if !response.deduplicated))
589            {
590                self.append_record(&command)?;
591            }
592            self.inner = preview;
593            Ok(response)
594        })
595    }
596
597    fn append_batch_with_cold_admission<'a>(
598        &'a mut self,
599        request: AppendBatchRequest,
600        placement: ShardPlacement,
601        admission: ColdWriteAdmission,
602    ) -> GroupAppendBatchFuture<'a> {
603        if !admission.is_enabled() {
604            return self.append_batch(request, placement);
605        }
606        Box::pin(async move {
607            self.ensure_ready()?;
608            self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
609            let command = GroupWriteCommand::from(request.clone());
610            let mut preview = self.inner.clone();
611            let response =
612                preview.append_batch_with_admission_inner(request, placement, admission)?;
613            if response
614                .items
615                .iter()
616                .any(|item| matches!(item, Ok(response) if !response.deduplicated))
617            {
618                self.append_record(&command)?;
619            }
620            self.inner = preview;
621            Ok(response)
622        })
623    }
624
625    fn flush_cold<'a>(
626        &'a mut self,
627        request: FlushColdRequest,
628        placement: ShardPlacement,
629    ) -> GroupFlushColdFuture<'a> {
630        Box::pin(async move {
631            self.ensure_ready()?;
632            let command = GroupWriteCommand::from(request);
633            let mut preview = self.inner.clone();
634            let response = match preview.apply_committed_write(command.clone(), placement)? {
635                GroupWriteResponse::FlushCold(response) => response,
636                other => {
637                    return Err(GroupEngineError::new(format!(
638                        "unexpected flush cold write response: {other:?}"
639                    )));
640                }
641            };
642            self.append_record(&command)?;
643            self.inner = preview;
644            Ok(response)
645        })
646    }
647
648    fn plan_cold_flush<'a>(
649        &'a mut self,
650        request: PlanColdFlushRequest,
651        placement: ShardPlacement,
652    ) -> GroupPlanColdFlushFuture<'a> {
653        Box::pin(async move {
654            self.ensure_ready()?;
655            self.inner.plan_cold_flush(request, placement).await
656        })
657    }
658
659    fn plan_next_cold_flush<'a>(
660        &'a mut self,
661        request: PlanGroupColdFlushRequest,
662        placement: ShardPlacement,
663    ) -> GroupPlanNextColdFlushFuture<'a> {
664        Box::pin(async move {
665            self.ensure_ready()?;
666            self.inner.plan_next_cold_flush(request, placement).await
667        })
668    }
669
670    fn plan_next_cold_flush_batch<'a>(
671        &'a mut self,
672        request: PlanGroupColdFlushRequest,
673        placement: ShardPlacement,
674        max_candidates: usize,
675    ) -> GroupPlanNextColdFlushBatchFuture<'a> {
676        Box::pin(async move {
677            self.ensure_ready()?;
678            self.inner
679                .plan_next_cold_flush_batch(request, placement, max_candidates)
680                .await
681        })
682    }
683
684    fn cold_hot_backlog<'a>(
685        &'a mut self,
686        stream_id: BucketStreamId,
687        placement: ShardPlacement,
688    ) -> GroupColdHotBacklogFuture<'a> {
689        Box::pin(async move {
690            self.ensure_ready()?;
691            self.inner.cold_hot_backlog(stream_id, placement).await
692        })
693    }
694
695    fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a> {
696        Box::pin(async move {
697            self.ensure_ready()?;
698            self.inner.snapshot(placement).await
699        })
700    }
701
702    fn install_snapshot<'a>(
703        &'a mut self,
704        snapshot: GroupSnapshot,
705    ) -> GroupInstallSnapshotFuture<'a> {
706        Box::pin(async move {
707            self.ensure_ready()?;
708            let mut preview = self.inner.clone();
709            preview.install_snapshot(snapshot.clone()).await?;
710            self.append_snapshot_record(&snapshot)?;
711            self.inner = preview;
712            Ok(())
713        })
714    }
715}
716
717pub(crate) fn group_log_path(root: &Path, placement: ShardPlacement) -> PathBuf {
718    root.join(format!("core-{}", placement.core_id.0))
719        .join(format!("group-{}.jsonl", placement.raft_group_id.0))
720}
721
722fn replay_group_log(log_path: &Path) -> Result<InMemoryGroupEngine, GroupEngineError> {
723    if !log_path.exists() {
724        return Ok(InMemoryGroupEngine::default());
725    }
726
727    let file = File::open(log_path).map_err(|err| {
728        GroupEngineError::new(format!("open WAL '{}': {err}", log_path.display()))
729    })?;
730    let reader = BufReader::new(file);
731    let mut inner = InMemoryGroupEngine::default();
732    for (line_index, line) in reader.lines().enumerate() {
733        let line = line.map_err(|err| {
734            GroupEngineError::new(format!(
735                "read WAL '{}' line {}: {err}",
736                log_path.display(),
737                line_index + 1
738            ))
739        })?;
740        if line.trim().is_empty() {
741            continue;
742        }
743        if let Ok(record) = serde_json::from_str::<WalRecord>(&line) {
744            match record {
745                WalRecord::Command { command } => inner
746                    .apply_replayed_write_command(*command)
747                    .map_err(|err| {
748                        GroupEngineError::new(format!(
749                            "replay WAL command '{}' line {}: {err}",
750                            log_path.display(),
751                            line_index + 1
752                        ))
753                    })?,
754                WalRecord::Snapshot {
755                    group_commit_index,
756                    stream_snapshot,
757                    stream_append_counts,
758                } => inner
759                    .install_snapshot_parts(
760                        group_commit_index,
761                        stream_snapshot,
762                        stream_append_counts,
763                    )
764                    .map_err(|err| {
765                        GroupEngineError::new(format!(
766                            "replay WAL snapshot '{}' line {}: {err}",
767                            log_path.display(),
768                            line_index + 1
769                        ))
770                    })?,
771            }
772            continue;
773        }
774
775        let command = serde_json::from_str::<StreamCommand>(&line).map_err(|err| {
776            GroupEngineError::new(format!(
777                "decode WAL '{}' line {}: {err}",
778                log_path.display(),
779                line_index + 1
780            ))
781        })?;
782        inner.apply_replayed_command(command).map_err(|err| {
783            GroupEngineError::new(format!(
784                "replay WAL '{}' line {}: {err}",
785                log_path.display(),
786                line_index + 1
787            ))
788        })?;
789    }
790    Ok(inner)
791}