1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
use crate::storage::TieredStorage;
use crate::topic_config::CleanupPolicy;
use crate::{Config, Error, Message, Partition, Result, TopicConfigManager};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use tracing::{info, warn};
/// Topic metadata for persistence
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicMetadata {
pub name: String,
pub num_partitions: u32,
pub created_at: i64,
}
/// Represents a topic with multiple partitions
#[derive(Debug)]
pub struct Topic {
/// Topic name
name: String,
/// Partitions in this topic (growable via add_partitions)
///
/// `parking_lot::RwLock` is intentional here — critical sections are
/// O(1) Vec index lookups and never held across `.await` points.
/// `tokio::sync::RwLock` would add unnecessary overhead for pure-sync access.
partitions: parking_lot::RwLock<Vec<Arc<Partition>>>,
}
impl Topic {
/// Create a new topic with the specified number of partitions
pub async fn new(config: &Config, name: String, num_partitions: u32) -> Result<Self> {
Self::new_with_tiered_storage(config, name, num_partitions, None).await
}
/// Create a new topic with the specified number of partitions and optional tiered storage
pub async fn new_with_tiered_storage(
config: &Config,
name: String,
num_partitions: u32,
tiered_storage: Option<Arc<TieredStorage>>,
) -> Result<Self> {
info!(
"Creating topic '{}' with {} partitions (tiered_storage: {})",
name,
num_partitions,
tiered_storage.is_some()
);
let mut partitions = Vec::new();
for id in 0..num_partitions {
partitions.push(Arc::new(
Partition::new_with_tiered_storage(config, &name, id, tiered_storage.clone())
.await?,
));
}
Ok(Self {
name,
partitions: parking_lot::RwLock::new(partitions),
})
}
/// Get the topic name
pub fn name(&self) -> &str {
&self.name
}
/// Get the number of partitions
pub fn num_partitions(&self) -> usize {
self.partitions.read().len()
}
/// Get a specific partition
pub fn partition(&self, partition_id: u32) -> Result<Arc<Partition>> {
self.partitions
.read()
.get(partition_id as usize)
.cloned()
.ok_or(Error::PartitionNotFound(partition_id))
}
/// Append a message to a specific partition
pub async fn append(&self, partition_id: u32, message: Message) -> Result<u64> {
let partition = self.partition(partition_id)?;
partition.append(message).await
}
/// Read messages from a specific partition
pub async fn read(
&self,
partition_id: u32,
start_offset: u64,
max_messages: usize,
) -> Result<Vec<Message>> {
let partition = self.partition(partition_id)?;
partition.read(start_offset, max_messages).await
}
/// Get all partitions
pub fn all_partitions(&self) -> Vec<Arc<Partition>> {
self.partitions.read().clone()
}
/// Flush all partitions to disk ensuring durability
pub async fn flush(&self) -> Result<()> {
let partitions = self.partitions.read().clone();
for partition in &partitions {
partition.flush().await?;
}
Ok(())
}
/// Run log compaction on all partitions.
///
/// For each partition, keeps only the latest message per key and removes
/// tombstone records (empty value). Only sealed (non-active) segments are
/// compacted; the currently active segment is left untouched.
///
/// Returns the total number of messages removed across all partitions.
pub async fn compact(&self) -> Result<usize> {
let partitions = self.partitions.read().clone();
let mut total_removed = 0;
for partition in &partitions {
total_removed += partition.compact().await?;
}
Ok(total_removed)
}
/// Find the first offset with timestamp >= target_timestamp (milliseconds since epoch)
/// Returns None if no matching offset is found.
pub async fn find_offset_for_timestamp(
&self,
partition_id: u32,
target_timestamp: i64,
) -> Result<Option<u64>> {
let partition = self.partition(partition_id)?;
partition.find_offset_for_timestamp(target_timestamp).await
}
/// Dynamically add partitions to this topic.
///
/// Creates new partitions with IDs from `current_count` to `new_total - 1`.
/// Existing partitions and their data are unaffected.
pub async fn add_partitions(
&self,
config: &Config,
new_total: u32,
tiered_storage: Option<Arc<TieredStorage>>,
) -> Result<u32> {
let current_count = self.num_partitions() as u32;
if new_total <= current_count {
return Err(Error::Other(format!(
"New partition count {} must exceed current count {}",
new_total, current_count
)));
}
let mut new_partitions = Vec::new();
for id in current_count..new_total {
new_partitions.push(Arc::new(
Partition::new_with_tiered_storage(config, &self.name, id, tiered_storage.clone())
.await?,
));
}
let added = new_partitions.len() as u32;
self.partitions.write().extend(new_partitions);
info!(
"Added {} partitions to topic '{}' (total: {})",
added, self.name, new_total
);
Ok(added)
}
}
/// Manages all topics in the system
#[derive(Debug, Clone)]
pub struct TopicManager {
topics: Arc<RwLock<HashMap<String, Arc<Topic>>>>,
config: Config,
tiered_storage: Option<Arc<TieredStorage>>,
}
/// Metadata file name for topic persistence
const TOPIC_METADATA_FILE: &str = "topic_metadata.json";
impl TopicManager {
/// Create a new topic manager and recover any existing topics from disk
pub fn new(config: Config) -> Self {
info!(
"Creating TopicManager with {} default partitions (tiered_storage: disabled)",
config.default_partitions
);
Self {
topics: Arc::new(RwLock::new(HashMap::new())),
config,
tiered_storage: None,
}
}
/// Create a new topic manager with tiered storage support
pub fn new_with_tiered_storage(config: Config, tiered_storage: Arc<TieredStorage>) -> Self {
info!(
"Creating TopicManager with {} default partitions (tiered_storage: enabled)",
config.default_partitions
);
Self {
topics: Arc::new(RwLock::new(HashMap::new())),
config,
tiered_storage: Some(tiered_storage),
}
}
/// Check if tiered storage is enabled
pub fn has_tiered_storage(&self) -> bool {
self.tiered_storage.is_some()
}
/// Get tiered storage statistics
pub fn tiered_storage_stats(&self) -> Option<crate::storage::TieredStorageStatsSnapshot> {
self.tiered_storage.as_ref().map(|ts| ts.stats())
}
/// Initialize and recover topics from disk
/// This should be called after construction to restore persisted topics
pub async fn recover(&self) -> Result<usize> {
if !self.config.enable_persistence {
info!("Persistence disabled, skipping topic recovery");
return Ok(0);
}
let data_dir = PathBuf::from(&self.config.data_dir);
let metadata_path = data_dir.join(TOPIC_METADATA_FILE);
// Try to load metadata file
if metadata_path.exists() {
match fs::read_to_string(&metadata_path).await {
Ok(content) => match serde_json::from_str::<Vec<TopicMetadata>>(&content) {
Ok(topics_metadata) => {
let count = topics_metadata.len();
info!("Recovering {} topics from metadata file", count);
for meta in topics_metadata {
if let Err(e) = self.recover_topic(&meta).await {
warn!("Failed to recover topic '{}': {}", meta.name, e);
}
}
return Ok(count);
}
Err(e) => {
warn!("Failed to parse topic metadata: {}", e);
}
},
Err(e) => {
warn!("Failed to read topic metadata file: {}", e);
}
}
}
// Fallback: scan data directory for topic directories
self.recover_from_directory_scan().await
}
/// Recover a single topic from metadata
async fn recover_topic(&self, meta: &TopicMetadata) -> Result<()> {
let mut topics = self.topics.write().await;
if topics.contains_key(&meta.name) {
return Ok(()); // Already recovered
}
info!(
"Recovering topic '{}' with {} partitions",
meta.name, meta.num_partitions
);
let topic = Arc::new(
Topic::new_with_tiered_storage(
&self.config,
meta.name.clone(),
meta.num_partitions,
self.tiered_storage.clone(),
)
.await?,
);
topics.insert(meta.name.clone(), topic);
Ok(())
}
/// Scan data directory for existing topic directories (fallback recovery)
async fn recover_from_directory_scan(&self) -> Result<usize> {
let data_dir = PathBuf::from(&self.config.data_dir);
if !data_dir.exists() {
return Ok(0);
}
let mut recovered = 0;
let mut entries = match fs::read_dir(&data_dir).await {
Ok(entries) => entries,
Err(e) => {
warn!("Failed to read data directory: {}", e);
return Ok(0);
}
};
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if !path.is_dir() {
continue;
}
let dir_name = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name.to_string(),
None => continue,
};
// Skip internal directories
if dir_name.starts_with('_') || dir_name.starts_with('.') {
continue;
}
// Check if this looks like a topic directory by looking for partition subdirs
let mut partition_count = 0u32;
if let Ok(mut topic_entries) = fs::read_dir(&path).await {
while let Ok(Some(partition_entry)) = topic_entries.next_entry().await {
let partition_path = partition_entry.path();
if partition_path.is_dir() {
if let Some(name) = partition_path.file_name().and_then(|n| n.to_str()) {
if name.starts_with("partition-") {
partition_count += 1;
}
}
}
}
}
if partition_count > 0 {
info!(
"Discovered topic '{}' with {} partitions from directory scan",
dir_name, partition_count
);
let meta = TopicMetadata {
name: dir_name,
num_partitions: partition_count,
created_at: 0, // Unknown
};
if let Err(e) = self.recover_topic(&meta).await {
warn!("Failed to recover topic '{}': {}", meta.name, e);
} else {
recovered += 1;
}
}
}
// Save discovered topics to metadata file for faster recovery next time
if recovered > 0 {
if let Err(e) = self.persist_metadata().await {
warn!("Failed to persist topic metadata after recovery: {}", e);
}
}
Ok(recovered)
}
/// Persist topic metadata to disk
async fn persist_metadata(&self) -> Result<()> {
if !self.config.enable_persistence {
return Ok(());
}
let data_dir = PathBuf::from(&self.config.data_dir);
fs::create_dir_all(&data_dir)
.await
.map_err(|e| Error::Other(format!("Failed to create data directory: {}", e)))?;
let topics = self.topics.read().await;
let metadata: Vec<TopicMetadata> = topics
.iter()
.map(|(name, topic)| TopicMetadata {
name: name.clone(),
num_partitions: topic.num_partitions() as u32,
created_at: chrono::Utc::now().timestamp_millis(),
})
.collect();
let metadata_path = data_dir.join(TOPIC_METADATA_FILE);
let tmp_path = data_dir.join(format!("{}.tmp", TOPIC_METADATA_FILE));
let content = serde_json::to_string_pretty(&metadata)
.map_err(|e| Error::Other(format!("Failed to serialize topic metadata: {}", e)))?;
// Atomic write via temp file + fsync + rename.
// A crash during write only corrupts the temp file; the original
// metadata file remains intact for recovery.
fs::write(&tmp_path, &content)
.await
.map_err(|e| Error::Other(format!("Failed to write topic metadata temp: {}", e)))?;
// fsync the temp file before rename to ensure data is on durable storage.
// Without this, a crash after rename could leave a zero-length or
// partially-written metadata file.
{
let tmp_file = fs::File::open(&tmp_path)
.await
.map_err(|e| Error::Other(format!("Failed to open temp file for fsync: {}", e)))?;
tmp_file
.sync_all()
.await
.map_err(|e| Error::Other(format!("Failed to fsync temp file: {}", e)))?;
}
fs::rename(&tmp_path, &metadata_path)
.await
.map_err(|e| Error::Other(format!("Failed to rename topic metadata: {}", e)))?;
// fsync the parent directory to ensure the rename (directory entry update)
// is durable. Without this, a crash could revert the rename.
{
let dir_file = std::fs::File::open(&data_dir)
.map_err(|e| Error::Other(format!("Failed to open data dir for fsync: {}", e)))?;
dir_file
.sync_all()
.map_err(|e| Error::Other(format!("Failed to fsync data dir: {}", e)))?;
}
info!("Persisted metadata for {} topics", topics.len());
Ok(())
}
/// Create a new topic
pub async fn create_topic(
&self,
name: String,
num_partitions: Option<u32>,
) -> Result<Arc<Topic>> {
let mut topics = self.topics.write().await;
if topics.contains_key(&name) {
return Err(Error::Other(format!("Topic '{}' already exists", name)));
}
let num_partitions = num_partitions.unwrap_or(self.config.default_partitions);
let topic = Arc::new(
Topic::new_with_tiered_storage(
&self.config,
name.clone(),
num_partitions,
self.tiered_storage.clone(),
)
.await?,
);
topics.insert(name.clone(), topic.clone());
drop(topics); // Release lock before persistence
// Persist metadata asynchronously
if let Err(e) = self.persist_metadata().await {
warn!("Failed to persist topic metadata after create_topic: {}", e);
}
Ok(topic)
}
/// Get a topic by name
pub async fn get_topic(&self, name: &str) -> Result<Arc<Topic>> {
let topics = self.topics.read().await;
topics
.get(name)
.cloned()
.ok_or_else(|| Error::TopicNotFound(name.to_string()))
}
/// Get or create a topic (race-safe: uses write lock directly)
pub async fn get_or_create_topic(&self, name: String) -> Result<Arc<Topic>> {
// Use write lock to atomically check-and-create, avoiding TOCTOU race
let mut topics = self.topics.write().await;
if let Some(topic) = topics.get(&name) {
return Ok(topic.clone());
}
let num_partitions = self.config.default_partitions;
let topic = Arc::new(
Topic::new_with_tiered_storage(
&self.config,
name.clone(),
num_partitions,
self.tiered_storage.clone(),
)
.await?,
);
topics.insert(name.clone(), topic.clone());
drop(topics); // Release lock before persistence
// Persist metadata asynchronously
if let Err(e) = self.persist_metadata().await {
warn!(
"Failed to persist topic metadata after get_or_create_topic: {}",
e
);
}
Ok(topic)
}
/// List all topics
pub async fn list_topics(&self) -> Vec<String> {
let topics = self.topics.read().await;
topics.keys().cloned().collect()
}
/// Delete a topic
pub async fn delete_topic(&self, name: &str) -> Result<()> {
let mut topics = self.topics.write().await;
topics
.remove(name)
.ok_or_else(|| Error::TopicNotFound(name.to_string()))?;
drop(topics); // Release lock before persistence
info!("Deleted topic '{}'", name);
// Update persisted metadata
if let Err(e) = self.persist_metadata().await {
warn!("Failed to persist topic metadata after delete_topic: {}", e);
}
Ok(())
}
/// Flush all topics to disk ensuring durability during shutdown
pub async fn flush_all(&self) -> Result<()> {
let topics = self.topics.read().await;
for (name, topic) in topics.iter() {
info!("Flushing topic '{}'...", name);
topic.flush().await?;
}
Ok(())
}
/// Run log compaction on topics whose cleanup policy includes compaction.
///
/// Iterates all topics, checks the policy via `topic_config_manager`, and
/// compacts partitions where the policy is `Compact` or `CompactDelete`.
/// Returns the total number of messages removed across all eligible topics.
pub async fn compact_topics(&self, topic_config_manager: &TopicConfigManager) -> Result<usize> {
let topics = self.topics.read().await;
let mut total_removed = 0usize;
for (name, topic) in topics.iter() {
let config = topic_config_manager.get_or_default(name);
match config.cleanup_policy {
CleanupPolicy::Compact | CleanupPolicy::CompactDelete => {
match topic.compact().await {
Ok(removed) => {
if removed > 0 {
info!(
topic = %name,
removed,
"Log compaction completed"
);
}
total_removed += removed;
}
Err(e) => {
warn!(
topic = %name,
error = %e,
"Log compaction failed"
);
}
}
}
CleanupPolicy::Delete => {
// Delete-only policy — no compaction needed
}
}
}
Ok(total_removed)
}
/// Add partitions to an existing topic.
///
/// Increases the partition count of the topic to `new_partition_count`.
/// Returns the number of partitions actually added.
pub async fn add_partitions(&self, name: &str, new_partition_count: u32) -> Result<u32> {
let topics = self.topics.read().await;
let topic = topics
.get(name)
.ok_or_else(|| Error::TopicNotFound(name.to_string()))?
.clone();
drop(topics);
let added = topic
.add_partitions(
&self.config,
new_partition_count,
self.tiered_storage.clone(),
)
.await?;
// Update persisted metadata
if let Err(e) = self.persist_metadata().await {
warn!(
"Failed to persist topic metadata after add_partitions: {}",
e
);
}
Ok(added)
}
/// §3.3: Build a WAL record payload for a topic write.
///
/// Encodes the message as:
/// `[topic_name_len:u32 BE][topic_name:bytes][partition_id:u32 BE][serialized_message:remaining]`
///
/// The `serialized_message` portion is a postcard-encoded `Message` struct,
/// preserving key, headers, producer metadata, and transaction markers
/// through WAL replay.
pub fn build_wal_record(
topic_name: &str,
partition_id: u32,
message: &Message,
) -> Result<bytes::Bytes> {
use bytes::BufMut;
let msg_bytes = message.to_bytes()?;
let name_bytes = topic_name.as_bytes();
// Guard against topic names exceeding u32::MAX bytes.
let name_len = u32::try_from(name_bytes.len()).map_err(|_| {
Error::Other(format!(
"Topic name too long for WAL record: {} bytes",
name_bytes.len()
))
})?;
let total = 4 + name_bytes.len() + 4 + msg_bytes.len();
let mut buf = bytes::BytesMut::with_capacity(total);
buf.put_u32(name_len);
buf.put_slice(name_bytes);
buf.put_u32(partition_id);
buf.put_slice(&msg_bytes);
Ok(buf.freeze())
}
/// Replay WAL records at startup and checkpoint afterwards.
///
/// This is the single entry point for WAL replay, used by both
/// `ClusterServer` and `SecureServer`. It:
///
/// 1. Filters out non-data records (transaction control, checkpoints)
/// 2. Skips already-replayed records via a persisted LSN marker
/// (prevents duplicates if the post-replay checkpoint fails)
/// 3. Applies remaining data records to partitions
/// 4. Persists the max replayed LSN marker
/// 5. Checkpoints (deletes) WAL files
pub async fn replay_wal_and_checkpoint(&self, core_config: &crate::Config) -> Result<()> {
let wal_dir = std::path::PathBuf::from(&core_config.data_dir).join("wal");
if !wal_dir.exists() {
return Ok(());
}
let wal_config = crate::WalConfig {
dir: wal_dir.clone(),
..Default::default()
};
// Load the last successfully replayed LSN from the marker file.
// This prevents duplicate replay when checkpoint_dir() fails.
let marker_path = wal_dir.join(".last_replayed_lsn");
let last_replayed_lsn: u64 = if marker_path.exists() {
std::fs::read_to_string(&marker_path)
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0)
} else {
0
};
match crate::GroupCommitWal::replay_all(&wal_config).await {
Ok(records) if !records.is_empty() => {
// Split into data records and transaction control records.
// Data records (Full/First/Middle/Last) are replayed into
// partitions. TxnCommit/TxnAbort records carry the list of
// affected partitions and trigger COMMIT/ABORT marker writes.
let data_records: Vec<_> = records
.iter()
.filter(|r| {
matches!(
r.record_type,
crate::RecordType::Full
| crate::RecordType::First
| crate::RecordType::Middle
| crate::RecordType::Last
)
})
.filter(|r| r.lsn > last_replayed_lsn)
.collect();
let txn_control_records: Vec<_> = records
.iter()
.filter(|r| {
matches!(
r.record_type,
crate::RecordType::TxnCommit | crate::RecordType::TxnAbort
)
})
.filter(|r| r.lsn > last_replayed_lsn)
.collect();
// Phase 1: Replay data records into partitions.
if data_records.is_empty() {
tracing::debug!(
total = records.len(),
"WAL replay: all data records already replayed or non-data, skipping"
);
} else {
tracing::info!(
total = records.len(),
data = data_records.len(),
txn_control = txn_control_records.len(),
skipped = records.len() - data_records.len() - txn_control_records.len(),
"WAL replay: applying {} data records at startup",
data_records.len()
);
for record in &data_records {
if let Err(e) = self.apply_wal_record(record).await {
return Err(Error::Other(format!(
"WAL replay: failed to apply record (lsn={}): {}. \
Aborting startup to prevent data loss. \
Inspect the WAL files in {} and retry, \
or remove them to accept data loss.",
record.lsn,
e,
wal_dir.display()
)));
}
}
}
// Phase 2 (WAL-002): Replay TxnCommit/TxnAbort records.
// Re-write COMMIT/ABORT markers to the partitions listed
// in the TxnWalPayload. Without this, read_committed
// consumers would see replayed data as uncommitted.
for record in &txn_control_records {
match postcard::from_bytes::<crate::TxnWalPayload>(&record.data) {
Ok(payload) => {
let marker_kind = if record.record_type == crate::RecordType::TxnCommit
{
crate::TransactionMarker::Commit
} else {
crate::TransactionMarker::Abort
};
let marker_label = if marker_kind == crate::TransactionMarker::Commit {
"COMMIT"
} else {
"ABORT"
};
for (topic_name, partition_id) in &payload.partitions {
match self.get_topic(topic_name).await {
Ok(topic_obj) => match topic_obj.partition(*partition_id) {
Ok(partition) => {
let marker = Message {
producer_id: Some(payload.producer_id),
transaction_marker: Some(marker_kind),
is_transactional: true,
..Message::new(bytes::Bytes::new())
};
if let Err(e) = partition.append(marker).await {
tracing::warn!(
txn_id = %payload.txn_id,
topic = %topic_name,
partition = partition_id,
error = %e,
"WAL replay: failed to write {} marker — \
read_committed consumers may see stale state",
marker_label
);
} else {
tracing::trace!(
txn_id = %payload.txn_id,
topic = %topic_name,
partition = partition_id,
"WAL replay: wrote {} marker",
marker_label
);
}
}
Err(_) => {
tracing::debug!(
txn_id = %payload.txn_id,
topic = %topic_name,
partition = partition_id,
"WAL replay: partition not found for {} marker, skipping",
marker_label
);
}
},
Err(_) => {
tracing::debug!(
txn_id = %payload.txn_id,
topic = %topic_name,
"WAL replay: topic not found for {} marker, skipping",
marker_label
);
}
}
}
}
Err(e) => {
tracing::warn!(
lsn = record.lsn,
error = %e,
"WAL replay: failed to deserialise TxnWalPayload — \
skipping transaction control record"
);
}
}
}
// Persist the max LSN marker BEFORE checkpoint so a failed
// checkpoint doesn't cause duplicate replay on next startup.
let max_lsn = records.iter().map(|r| r.lsn).max().unwrap_or(0);
if max_lsn > last_replayed_lsn {
if let Err(e) = std::fs::write(&marker_path, max_lsn.to_string()) {
tracing::warn!(error = %e, "Failed to write WAL replay LSN marker");
}
}
// Checkpoint: delete WAL files.
if let Err(e) = crate::GroupCommitWal::checkpoint_dir(&wal_config.dir) {
tracing::warn!(
error = %e,
"WAL post-replay checkpoint failed — \
duplicate replay prevented by LSN marker"
);
}
}
Ok(_) => {
tracing::debug!("WAL replay: no records to replay");
}
Err(e) => {
return Err(Error::Other(format!(
"WAL replay: failed to read WAL files: {}. \
Check WAL files in {} and retry, or remove them to accept data loss.",
e,
wal_dir.display()
)));
}
}
Ok(())
}
/// §3.3: Apply a single WAL record during startup replay.
///
/// WAL records encode a topic write as:
/// `[topic_name_len:u32][topic_name:bytes][partition_id:u32][message_data:remaining]`
///
/// Records whose topic or partition don't exist are silently skipped (the
/// topic was deleted after the WAL entry was written).
///
/// **Important:** This function always appends; deduplication is handled
/// by the caller (`replay_wal_and_checkpoint`) via an LSN marker.
/// Non-data records (TxnCommit, TxnAbort, etc.) must be filtered out
/// before calling this function.
pub async fn apply_wal_record(&self, record: &crate::WalRecord) -> Result<()> {
use bytes::Buf;
let data = &record.data[..];
if data.len() < 8 {
return Err(Error::Other(format!(
"WAL record too short: {} bytes",
data.len()
)));
}
let mut cursor = std::io::Cursor::new(data);
let name_len = cursor.get_u32() as usize;
if cursor.remaining() < name_len + 4 {
return Err(Error::Other("WAL record truncated".to_string()));
}
let name_bytes = &data[4..4 + name_len];
let topic_name = std::str::from_utf8(name_bytes)
.map_err(|e| Error::Other(format!("Invalid topic name in WAL: {}", e)))?;
cursor.set_position((4 + name_len) as u64);
let partition_id = cursor.get_u32();
let msg_start = 4 + name_len + 4;
let msg_data = bytes::Bytes::copy_from_slice(&data[msg_start..]);
// Look up topic — if deleted, skip silently
let topic = match self.get_topic(topic_name).await {
Ok(t) => t,
Err(_) => {
tracing::debug!(
topic = %topic_name,
lsn = record.lsn,
"WAL replay: topic not found, skipping"
);
return Ok(());
}
};
// Deserialise the full Message (key, headers, producer metadata, etc.)
// from postcard encoding. No backward-compatible fallback — corrupt
// or legacy records are treated as errors because a raw-value Message
// would have offset=0, causing all records to overwrite each other
// when replayed via append_replicated().
let message = Message::from_bytes(&msg_data).map_err(|e| {
Error::Other(format!(
"WAL replay: Message::from_bytes failed for topic={} partition={} lsn={}: {}. \
The WAL record may be corrupt or written by an incompatible version. \
Inspect the WAL files and retry, or remove them to accept data loss.",
topic_name, partition_id, record.lsn, e
))
})?;
// WAL-001: Use append_replicated() instead of append().
// topic.append() allocates a NEW offset via next_offset.fetch_add(1),
// discarding the original leader-assigned offset stored in the message.
// append_replicated() uses fetch_max to preserve the original offset,
// which is critical for WAL replay correctness — replayed records must
// land at the same offsets they had before the crash.
let partition = topic.partition(partition_id)?;
match partition.append_replicated(message).await {
Ok(offset) => {
tracing::trace!(
topic = %topic_name,
partition = partition_id,
offset,
lsn = record.lsn,
"WAL replay: applied record"
);
Ok(())
}
Err(e) => {
tracing::warn!(
topic = %topic_name,
partition = partition_id,
lsn = record.lsn,
error = %e,
"WAL replay: failed to apply record"
);
Err(Error::Other(format!(
"WAL replay failed for {}/{}: {}",
topic_name, partition_id, e
)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
fn get_test_config() -> Config {
Config {
data_dir: format!("/tmp/rivven-test-{}", uuid::Uuid::new_v4()),
..Default::default()
}
}
#[tokio::test]
async fn test_topic_creation() {
let config = get_test_config();
let topic = Topic::new(&config, "test-topic".to_string(), 3)
.await
.unwrap();
assert_eq!(topic.name(), "test-topic");
assert_eq!(topic.num_partitions(), 3);
}
#[tokio::test]
async fn test_topic_append_and_read() {
let config = get_test_config();
let topic = Topic::new(&config, "test-topic".to_string(), 2)
.await
.unwrap();
let msg = Message::new(Bytes::from("test"));
let offset = topic.append(0, msg).await.unwrap();
assert_eq!(offset, 0);
let messages = topic.read(0, 0, 10).await.unwrap();
assert_eq!(messages.len(), 1);
}
#[tokio::test]
async fn test_topic_manager() {
let config = get_test_config();
let manager = TopicManager::new(config);
let topic = manager
.create_topic("test".to_string(), None)
.await
.unwrap();
assert_eq!(topic.num_partitions(), 3);
let retrieved = manager.get_topic("test").await.unwrap();
assert_eq!(retrieved.name(), "test");
let topics = manager.list_topics().await;
assert_eq!(topics.len(), 1);
}
}