1use std::fs::{self, File, OpenOptions};
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::time::Instant;
5
6use serde::{Deserialize, Serialize};
7use ursula_shard::{BucketStreamId, ShardPlacement};
8use ursula_stream::{StreamCommand, StreamErrorCode, StreamSnapshot};
9
10use crate::cold_store::ColdStoreHandle;
11use crate::command::{GroupSnapshot, GroupWriteCommand};
12use crate::engine::{
13 GroupAppendBatchFuture, GroupAppendFuture, GroupBootstrapStreamFuture, GroupCloseStreamFuture,
14 GroupColdHotBacklogFuture, GroupCreateStreamFuture, GroupDeleteSnapshotFuture,
15 GroupDeleteStreamFuture, GroupEngine, GroupEngineCreateFuture, GroupEngineError,
16 GroupEngineFactory, GroupEngineMetrics, GroupFlushColdFuture, GroupForkRefFuture,
17 GroupHeadStreamFuture, GroupInstallSnapshotFuture, GroupPlanColdFlushFuture,
18 GroupPlanNextColdFlushBatchFuture, GroupPlanNextColdFlushFuture, GroupPublishSnapshotFuture,
19 GroupReadSnapshotFuture, GroupReadStreamFuture, GroupSnapshotFuture,
20 GroupTouchStreamAccessFuture, GroupWriteResponse,
21};
22use crate::engine_in_memory::InMemoryGroupEngine;
23use crate::metrics::elapsed_ns;
24use crate::request::{
25 AppendBatchRequest, AppendRequest, BootstrapStreamRequest, CloseStreamRequest,
26 ColdWriteAdmission, CreateStreamRequest, DeleteSnapshotRequest, DeleteStreamRequest,
27 FlushColdRequest, HeadStreamRequest, PlanColdFlushRequest, PlanGroupColdFlushRequest,
28 PublishSnapshotRequest, ReadSnapshotRequest, ReadStreamRequest, StreamAppendCount,
29 TouchStreamAccessResponse,
30};
31
32#[derive(Debug, Clone)]
33pub struct WalGroupEngineFactory {
34 root: PathBuf,
35 cold_store: Option<ColdStoreHandle>,
36}
37
38impl WalGroupEngineFactory {
39 pub fn new(root: impl Into<PathBuf>) -> Self {
40 Self {
41 root: root.into(),
42 cold_store: None,
43 }
44 }
45
46 pub fn with_cold_store(root: impl Into<PathBuf>, cold_store: Option<ColdStoreHandle>) -> Self {
47 Self {
48 root: root.into(),
49 cold_store,
50 }
51 }
52}
53
54impl GroupEngineFactory for WalGroupEngineFactory {
55 fn create<'a>(
56 &'a self,
57 placement: ShardPlacement,
58 metrics: GroupEngineMetrics,
59 ) -> GroupEngineCreateFuture<'a> {
60 Box::pin(async move {
61 let engine: Box<dyn GroupEngine> = Box::new(WalGroupEngine::open(
62 &self.root,
63 placement,
64 metrics,
65 self.cold_store.clone(),
66 ));
67 Ok(engine)
68 })
69 }
70}
71
72pub struct WalGroupEngine {
73 inner: InMemoryGroupEngine,
74 log_path: PathBuf,
75 placement: ShardPlacement,
76 metrics: GroupEngineMetrics,
77 init_error: Option<String>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(tag = "wal_record", rename_all = "snake_case")]
82enum WalRecord {
83 Command {
84 command: Box<GroupWriteCommand>,
85 },
86 Snapshot {
87 group_commit_index: u64,
88 stream_snapshot: StreamSnapshot,
89 stream_append_counts: Vec<StreamAppendCount>,
90 },
91}
92
93impl WalGroupEngine {
94 fn open(
95 root: &Path,
96 placement: ShardPlacement,
97 metrics: GroupEngineMetrics,
98 cold_store: Option<ColdStoreHandle>,
99 ) -> Self {
100 let log_path = group_log_path(root, placement);
101 match replay_group_log(&log_path) {
102 Ok(mut inner) => {
103 inner.cold_store = cold_store;
104 Self {
105 inner,
106 log_path,
107 placement,
108 metrics,
109 init_error: None,
110 }
111 }
112 Err(err) => Self {
113 inner: InMemoryGroupEngine {
114 cold_store,
115 ..InMemoryGroupEngine::default()
116 },
117 log_path,
118 placement,
119 metrics,
120 init_error: Some(err.message().to_owned()),
121 },
122 }
123 }
124
125 fn ensure_ready(&self) -> Result<(), GroupEngineError> {
126 match &self.init_error {
127 Some(message) => Err(GroupEngineError::new(message.clone())),
128 None => Ok(()),
129 }
130 }
131
132 fn append_record(&self, command: &GroupWriteCommand) -> Result<(), GroupEngineError> {
133 self.append_records(std::slice::from_ref(command))
134 }
135
136 fn append_records(&self, commands: &[GroupWriteCommand]) -> Result<(), GroupEngineError> {
137 if commands.is_empty() {
138 return Ok(());
139 }
140 let Some(parent) = self.log_path.parent() else {
141 return Err(GroupEngineError::new(format!(
142 "WAL path '{}' has no parent directory",
143 self.log_path.display()
144 )));
145 };
146 fs::create_dir_all(parent).map_err(|err| {
147 GroupEngineError::new(format!("create WAL dir '{}': {err}", parent.display()))
148 })?;
149 let write_started_at = Instant::now();
150 let mut file = OpenOptions::new()
151 .create(true)
152 .append(true)
153 .open(&self.log_path)
154 .map_err(|err| {
155 GroupEngineError::new(format!("open WAL '{}': {err}", self.log_path.display()))
156 })?;
157 for command in commands {
158 let record = WalRecord::Command {
159 command: Box::new(command.clone()),
160 };
161 serde_json::to_writer(&mut file, &record).map_err(|err| {
162 GroupEngineError::new(format!("encode WAL '{}': {err}", self.log_path.display()))
163 })?;
164 file.write_all(b"\n").map_err(|err| {
165 GroupEngineError::new(format!("write WAL '{}': {err}", self.log_path.display()))
166 })?;
167 }
168 let write_ns = elapsed_ns(write_started_at);
169 let sync_started_at = Instant::now();
170 file.sync_data().map_err(|err| {
171 GroupEngineError::new(format!("sync WAL '{}': {err}", self.log_path.display()))
172 })?;
173 self.metrics.record_wal_batch(
174 self.placement,
175 commands.len(),
176 write_ns,
177 elapsed_ns(sync_started_at),
178 );
179 Ok(())
180 }
181
182 fn append_snapshot_record(&self, snapshot: &GroupSnapshot) -> Result<(), GroupEngineError> {
183 let record = WalRecord::Snapshot {
184 group_commit_index: snapshot.group_commit_index,
185 stream_snapshot: snapshot.stream_snapshot.clone(),
186 stream_append_counts: snapshot.stream_append_counts.clone(),
187 };
188 let Some(parent) = self.log_path.parent() else {
189 return Err(GroupEngineError::new(format!(
190 "WAL path '{}' has no parent directory",
191 self.log_path.display()
192 )));
193 };
194 fs::create_dir_all(parent).map_err(|err| {
195 GroupEngineError::new(format!("create WAL dir '{}': {err}", parent.display()))
196 })?;
197 let write_started_at = Instant::now();
198 let mut file = OpenOptions::new()
199 .create(true)
200 .append(true)
201 .open(&self.log_path)
202 .map_err(|err| {
203 GroupEngineError::new(format!("open WAL '{}': {err}", self.log_path.display()))
204 })?;
205 serde_json::to_writer(&mut file, &record).map_err(|err| {
206 GroupEngineError::new(format!("encode WAL '{}': {err}", self.log_path.display()))
207 })?;
208 file.write_all(b"\n").map_err(|err| {
209 GroupEngineError::new(format!("write WAL '{}': {err}", self.log_path.display()))
210 })?;
211 let write_ns = elapsed_ns(write_started_at);
212 let sync_started_at = Instant::now();
213 file.sync_data().map_err(|err| {
214 GroupEngineError::new(format!("sync WAL '{}': {err}", self.log_path.display()))
215 })?;
216 self.metrics
217 .record_wal_batch(self.placement, 1, write_ns, elapsed_ns(sync_started_at));
218 Ok(())
219 }
220
221 fn commit_access_if_needed(
222 &mut self,
223 stream_id: &BucketStreamId,
224 now_ms: u64,
225 renew_ttl: bool,
226 placement: ShardPlacement,
227 ) -> Result<Option<TouchStreamAccessResponse>, GroupEngineError> {
228 if !self
229 .inner
230 .access_requires_write(stream_id, now_ms, renew_ttl)?
231 {
232 return Ok(None);
233 }
234 let command = GroupWriteCommand::TouchStreamAccess {
235 stream_id: stream_id.clone(),
236 now_ms,
237 renew_ttl,
238 };
239 let mut preview = self.inner.clone();
240 let response = match preview.apply_committed_write(command.clone(), placement)? {
241 GroupWriteResponse::TouchStreamAccess(response) => response,
242 other => {
243 return Err(GroupEngineError::new(format!(
244 "unexpected touch stream access write response: {other:?}"
245 )));
246 }
247 };
248 if response.changed || response.expired {
249 self.append_record(&command)?;
250 }
251 self.inner = preview;
252 if response.expired {
253 return Err(GroupEngineError::stream(
254 StreamErrorCode::StreamNotFound,
255 format!("stream '{stream_id}' does not exist"),
256 ));
257 }
258 Ok(Some(response))
259 }
260}
261
262impl GroupEngine for WalGroupEngine {
263 fn create_stream<'a>(
264 &'a mut self,
265 request: CreateStreamRequest,
266 placement: ShardPlacement,
267 ) -> GroupCreateStreamFuture<'a> {
268 Box::pin(async move {
269 self.ensure_ready()?;
270 let command = GroupWriteCommand::from(request);
271 let mut preview = self.inner.clone();
272 let response = match preview.apply_committed_write(command.clone(), placement)? {
273 GroupWriteResponse::CreateStream(response) => response,
274 other => {
275 return Err(GroupEngineError::new(format!(
276 "unexpected create stream write response: {other:?}"
277 )));
278 }
279 };
280 if !response.already_exists {
281 self.append_record(&command)?;
282 }
283 self.inner = preview;
284 Ok(response)
285 })
286 }
287
288 fn create_stream_with_cold_admission<'a>(
289 &'a mut self,
290 request: CreateStreamRequest,
291 placement: ShardPlacement,
292 admission: ColdWriteAdmission,
293 ) -> GroupCreateStreamFuture<'a> {
294 if !admission.is_enabled() {
295 return self.create_stream(request, placement);
296 }
297 Box::pin(async move {
298 self.ensure_ready()?;
299 let command = GroupWriteCommand::from(request.clone());
300 let mut preview = self.inner.clone();
301 let response =
302 preview.create_stream_with_admission_inner(request, placement, admission)?;
303 if !response.already_exists {
304 self.append_record(&command)?;
305 }
306 self.inner = preview;
307 Ok(response)
308 })
309 }
310
311 fn head_stream<'a>(
312 &'a mut self,
313 request: HeadStreamRequest,
314 placement: ShardPlacement,
315 ) -> GroupHeadStreamFuture<'a> {
316 Box::pin(async move {
317 self.ensure_ready()?;
318 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
319 self.inner.head_stream(request, placement).await
320 })
321 }
322
323 fn read_stream<'a>(
324 &'a mut self,
325 request: ReadStreamRequest,
326 placement: ShardPlacement,
327 ) -> GroupReadStreamFuture<'a> {
328 Box::pin(async move {
329 self.ensure_ready()?;
330 self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
331 self.inner.read_stream(request, placement).await
332 })
333 }
334
335 fn publish_snapshot<'a>(
336 &'a mut self,
337 request: PublishSnapshotRequest,
338 placement: ShardPlacement,
339 ) -> GroupPublishSnapshotFuture<'a> {
340 Box::pin(async move {
341 self.ensure_ready()?;
342 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
343 let command = GroupWriteCommand::from(request);
344 let mut preview = self.inner.clone();
345 let response = match preview.apply_committed_write(command.clone(), placement)? {
346 GroupWriteResponse::PublishSnapshot(response) => response,
347 other => {
348 return Err(GroupEngineError::new(format!(
349 "unexpected publish snapshot write response: {other:?}"
350 )));
351 }
352 };
353 self.append_record(&command)?;
354 self.inner = preview;
355 Ok(response)
356 })
357 }
358
359 fn read_snapshot<'a>(
360 &'a mut self,
361 request: ReadSnapshotRequest,
362 placement: ShardPlacement,
363 ) -> GroupReadSnapshotFuture<'a> {
364 Box::pin(async move {
365 self.ensure_ready()?;
366 self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
367 self.inner.read_snapshot(request, placement).await
368 })
369 }
370
371 fn delete_snapshot<'a>(
372 &'a mut self,
373 request: DeleteSnapshotRequest,
374 placement: ShardPlacement,
375 ) -> GroupDeleteSnapshotFuture<'a> {
376 Box::pin(async move {
377 self.ensure_ready()?;
378 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
379 self.inner.delete_snapshot(request, placement).await
380 })
381 }
382
383 fn bootstrap_stream<'a>(
384 &'a mut self,
385 request: BootstrapStreamRequest,
386 placement: ShardPlacement,
387 ) -> GroupBootstrapStreamFuture<'a> {
388 Box::pin(async move {
389 self.ensure_ready()?;
390 self.commit_access_if_needed(&request.stream_id, request.now_ms, true, placement)?;
391 self.inner.bootstrap_stream(request, placement).await
392 })
393 }
394
395 fn touch_stream_access<'a>(
396 &'a mut self,
397 stream_id: BucketStreamId,
398 now_ms: u64,
399 renew_ttl: bool,
400 placement: ShardPlacement,
401 ) -> GroupTouchStreamAccessFuture<'a> {
402 Box::pin(async move {
403 self.ensure_ready()?;
404 let command = GroupWriteCommand::TouchStreamAccess {
405 stream_id,
406 now_ms,
407 renew_ttl,
408 };
409 let mut preview = self.inner.clone();
410 let response = match preview.apply_committed_write(command.clone(), placement)? {
411 GroupWriteResponse::TouchStreamAccess(response) => response,
412 other => {
413 return Err(GroupEngineError::new(format!(
414 "unexpected touch stream access write response: {other:?}"
415 )));
416 }
417 };
418 if response.changed || response.expired {
419 self.append_record(&command)?;
420 }
421 self.inner = preview;
422 Ok(response)
423 })
424 }
425
426 fn add_fork_ref<'a>(
427 &'a mut self,
428 stream_id: BucketStreamId,
429 now_ms: u64,
430 placement: ShardPlacement,
431 ) -> GroupForkRefFuture<'a> {
432 Box::pin(async move {
433 self.ensure_ready()?;
434 let command = GroupWriteCommand::AddForkRef { stream_id, now_ms };
435 let mut preview = self.inner.clone();
436 let response = match preview.apply_committed_write(command.clone(), placement)? {
437 GroupWriteResponse::AddForkRef(response) => response,
438 other => {
439 return Err(GroupEngineError::new(format!(
440 "unexpected add fork ref write response: {other:?}"
441 )));
442 }
443 };
444 self.append_record(&command)?;
445 self.inner = preview;
446 Ok(response)
447 })
448 }
449
450 fn release_fork_ref<'a>(
451 &'a mut self,
452 stream_id: BucketStreamId,
453 placement: ShardPlacement,
454 ) -> GroupForkRefFuture<'a> {
455 Box::pin(async move {
456 self.ensure_ready()?;
457 let command = GroupWriteCommand::ReleaseForkRef { stream_id };
458 let mut preview = self.inner.clone();
459 let response = match preview.apply_committed_write(command.clone(), placement)? {
460 GroupWriteResponse::ReleaseForkRef(response) => response,
461 other => {
462 return Err(GroupEngineError::new(format!(
463 "unexpected release fork ref write response: {other:?}"
464 )));
465 }
466 };
467 self.append_record(&command)?;
468 self.inner = preview;
469 Ok(response)
470 })
471 }
472
473 fn close_stream<'a>(
474 &'a mut self,
475 request: CloseStreamRequest,
476 placement: ShardPlacement,
477 ) -> GroupCloseStreamFuture<'a> {
478 Box::pin(async move {
479 self.ensure_ready()?;
480 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
481 let command = GroupWriteCommand::from(request);
482 let mut preview = self.inner.clone();
483 let response = match preview.apply_committed_write(command.clone(), placement)? {
484 GroupWriteResponse::CloseStream(response) => response,
485 other => {
486 return Err(GroupEngineError::new(format!(
487 "unexpected close stream write response: {other:?}"
488 )));
489 }
490 };
491 self.append_record(&command)?;
492 self.inner = preview;
493 Ok(response)
494 })
495 }
496
497 fn delete_stream<'a>(
498 &'a mut self,
499 request: DeleteStreamRequest,
500 placement: ShardPlacement,
501 ) -> GroupDeleteStreamFuture<'a> {
502 Box::pin(async move {
503 self.ensure_ready()?;
504 let command = GroupWriteCommand::from(request);
505 let mut preview = self.inner.clone();
506 let response = match preview.apply_committed_write(command.clone(), placement)? {
507 GroupWriteResponse::DeleteStream(response) => response,
508 other => {
509 return Err(GroupEngineError::new(format!(
510 "unexpected delete stream write response: {other:?}"
511 )));
512 }
513 };
514 self.append_record(&command)?;
515 self.inner = preview;
516 Ok(response)
517 })
518 }
519
520 fn append<'a>(
521 &'a mut self,
522 request: AppendRequest,
523 placement: ShardPlacement,
524 ) -> GroupAppendFuture<'a> {
525 Box::pin(async move {
526 self.ensure_ready()?;
527 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
528 let command = GroupWriteCommand::from(request);
529 let mut preview = self.inner.clone();
530 let response = match preview.apply_committed_write(command.clone(), placement)? {
531 GroupWriteResponse::Append(response) => response,
532 other => {
533 return Err(GroupEngineError::new(format!(
534 "unexpected append write response: {other:?}"
535 )));
536 }
537 };
538 self.append_record(&command)?;
539 self.inner = preview;
540 Ok(response)
541 })
542 }
543
544 fn append_with_cold_admission<'a>(
545 &'a mut self,
546 request: AppendRequest,
547 placement: ShardPlacement,
548 admission: ColdWriteAdmission,
549 ) -> GroupAppendFuture<'a> {
550 if !admission.is_enabled() {
551 return self.append(request, placement);
552 }
553 Box::pin(async move {
554 self.ensure_ready()?;
555 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
556 let command = GroupWriteCommand::from(request.clone());
557 let mut preview = self.inner.clone();
558 let response = preview.append_with_admission_inner(request, placement, admission)?;
559 if !response.deduplicated {
560 self.append_record(&command)?;
561 }
562 self.inner = preview;
563 Ok(response)
564 })
565 }
566
567 fn append_batch<'a>(
568 &'a mut self,
569 request: AppendBatchRequest,
570 placement: ShardPlacement,
571 ) -> GroupAppendBatchFuture<'a> {
572 Box::pin(async move {
573 self.ensure_ready()?;
574 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
575 let command = GroupWriteCommand::from(request);
576 let mut preview = self.inner.clone();
577 let response = match preview.apply_committed_write(command.clone(), placement)? {
578 GroupWriteResponse::AppendBatch(response) => response,
579 other => {
580 return Err(GroupEngineError::new(format!(
581 "unexpected append batch write response: {other:?}"
582 )));
583 }
584 };
585 if response
586 .items
587 .iter()
588 .any(|item| matches!(item, Ok(response) if !response.deduplicated))
589 {
590 self.append_record(&command)?;
591 }
592 self.inner = preview;
593 Ok(response)
594 })
595 }
596
597 fn append_batch_with_cold_admission<'a>(
598 &'a mut self,
599 request: AppendBatchRequest,
600 placement: ShardPlacement,
601 admission: ColdWriteAdmission,
602 ) -> GroupAppendBatchFuture<'a> {
603 if !admission.is_enabled() {
604 return self.append_batch(request, placement);
605 }
606 Box::pin(async move {
607 self.ensure_ready()?;
608 self.commit_access_if_needed(&request.stream_id, request.now_ms, false, placement)?;
609 let command = GroupWriteCommand::from(request.clone());
610 let mut preview = self.inner.clone();
611 let response =
612 preview.append_batch_with_admission_inner(request, placement, admission)?;
613 if response
614 .items
615 .iter()
616 .any(|item| matches!(item, Ok(response) if !response.deduplicated))
617 {
618 self.append_record(&command)?;
619 }
620 self.inner = preview;
621 Ok(response)
622 })
623 }
624
625 fn flush_cold<'a>(
626 &'a mut self,
627 request: FlushColdRequest,
628 placement: ShardPlacement,
629 ) -> GroupFlushColdFuture<'a> {
630 Box::pin(async move {
631 self.ensure_ready()?;
632 let command = GroupWriteCommand::from(request);
633 let mut preview = self.inner.clone();
634 let response = match preview.apply_committed_write(command.clone(), placement)? {
635 GroupWriteResponse::FlushCold(response) => response,
636 other => {
637 return Err(GroupEngineError::new(format!(
638 "unexpected flush cold write response: {other:?}"
639 )));
640 }
641 };
642 self.append_record(&command)?;
643 self.inner = preview;
644 Ok(response)
645 })
646 }
647
648 fn plan_cold_flush<'a>(
649 &'a mut self,
650 request: PlanColdFlushRequest,
651 placement: ShardPlacement,
652 ) -> GroupPlanColdFlushFuture<'a> {
653 Box::pin(async move {
654 self.ensure_ready()?;
655 self.inner.plan_cold_flush(request, placement).await
656 })
657 }
658
659 fn plan_next_cold_flush<'a>(
660 &'a mut self,
661 request: PlanGroupColdFlushRequest,
662 placement: ShardPlacement,
663 ) -> GroupPlanNextColdFlushFuture<'a> {
664 Box::pin(async move {
665 self.ensure_ready()?;
666 self.inner.plan_next_cold_flush(request, placement).await
667 })
668 }
669
670 fn plan_next_cold_flush_batch<'a>(
671 &'a mut self,
672 request: PlanGroupColdFlushRequest,
673 placement: ShardPlacement,
674 max_candidates: usize,
675 ) -> GroupPlanNextColdFlushBatchFuture<'a> {
676 Box::pin(async move {
677 self.ensure_ready()?;
678 self.inner
679 .plan_next_cold_flush_batch(request, placement, max_candidates)
680 .await
681 })
682 }
683
684 fn cold_hot_backlog<'a>(
685 &'a mut self,
686 stream_id: BucketStreamId,
687 placement: ShardPlacement,
688 ) -> GroupColdHotBacklogFuture<'a> {
689 Box::pin(async move {
690 self.ensure_ready()?;
691 self.inner.cold_hot_backlog(stream_id, placement).await
692 })
693 }
694
695 fn snapshot<'a>(&'a mut self, placement: ShardPlacement) -> GroupSnapshotFuture<'a> {
696 Box::pin(async move {
697 self.ensure_ready()?;
698 self.inner.snapshot(placement).await
699 })
700 }
701
702 fn install_snapshot<'a>(
703 &'a mut self,
704 snapshot: GroupSnapshot,
705 ) -> GroupInstallSnapshotFuture<'a> {
706 Box::pin(async move {
707 self.ensure_ready()?;
708 let mut preview = self.inner.clone();
709 preview.install_snapshot(snapshot.clone()).await?;
710 self.append_snapshot_record(&snapshot)?;
711 self.inner = preview;
712 Ok(())
713 })
714 }
715}
716
717pub(crate) fn group_log_path(root: &Path, placement: ShardPlacement) -> PathBuf {
718 root.join(format!("core-{}", placement.core_id.0))
719 .join(format!("group-{}.jsonl", placement.raft_group_id.0))
720}
721
722fn replay_group_log(log_path: &Path) -> Result<InMemoryGroupEngine, GroupEngineError> {
723 if !log_path.exists() {
724 return Ok(InMemoryGroupEngine::default());
725 }
726
727 let file = File::open(log_path).map_err(|err| {
728 GroupEngineError::new(format!("open WAL '{}': {err}", log_path.display()))
729 })?;
730 let reader = BufReader::new(file);
731 let mut inner = InMemoryGroupEngine::default();
732 for (line_index, line) in reader.lines().enumerate() {
733 let line = line.map_err(|err| {
734 GroupEngineError::new(format!(
735 "read WAL '{}' line {}: {err}",
736 log_path.display(),
737 line_index + 1
738 ))
739 })?;
740 if line.trim().is_empty() {
741 continue;
742 }
743 if let Ok(record) = serde_json::from_str::<WalRecord>(&line) {
744 match record {
745 WalRecord::Command { command } => inner
746 .apply_replayed_write_command(*command)
747 .map_err(|err| {
748 GroupEngineError::new(format!(
749 "replay WAL command '{}' line {}: {err}",
750 log_path.display(),
751 line_index + 1
752 ))
753 })?,
754 WalRecord::Snapshot {
755 group_commit_index,
756 stream_snapshot,
757 stream_append_counts,
758 } => inner
759 .install_snapshot_parts(
760 group_commit_index,
761 stream_snapshot,
762 stream_append_counts,
763 )
764 .map_err(|err| {
765 GroupEngineError::new(format!(
766 "replay WAL snapshot '{}' line {}: {err}",
767 log_path.display(),
768 line_index + 1
769 ))
770 })?,
771 }
772 continue;
773 }
774
775 let command = serde_json::from_str::<StreamCommand>(&line).map_err(|err| {
776 GroupEngineError::new(format!(
777 "decode WAL '{}' line {}: {err}",
778 log_path.display(),
779 line_index + 1
780 ))
781 })?;
782 inner.apply_replayed_command(command).map_err(|err| {
783 GroupEngineError::new(format!(
784 "replay WAL '{}' line {}: {err}",
785 log_path.display(),
786 line_index + 1
787 ))
788 })?;
789 }
790 Ok(inner)
791}