1use std::sync::Arc;
2use std::time::Instant;
3
4use kaya_core::{DurabilityMode, KayaError, Lsn, Result, SequenceNumber, WalConfig};
5use kaya_io::{Disk, RelativePath};
6use tokio::sync::Mutex;
7
8use crate::batch::{BatchAction, WalBatchWriter};
9use crate::{encode_record, WalPayload, WalRecord};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
12pub struct SegmentId(pub u64);
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct AppendResult {
16 pub lsn: Lsn,
17 pub sequence: SequenceNumber,
18 pub segment_id: SegmentId,
19 pub offset: u64,
20 pub encoded_len: u32,
21 pub durable: bool,
22 pub fsync_duration_us: Option<u64>,
25}
26
27#[derive(Debug)]
28struct WalWriterInner<D: Disk> {
29 disk: Arc<D>,
30 config: WalConfig,
31 active_segment_id: SegmentId,
32 active_path: RelativePath,
33 active_len: u64,
34 next_lsn: Lsn,
35 next_sequence: SequenceNumber,
36 batch: WalBatchWriter,
37}
38
39#[derive(Debug, Clone)]
40pub struct WalWriter<D: Disk> {
41 inner: Arc<Mutex<WalWriterInner<D>>>,
42}
43
44impl<D: Disk> WalWriter<D> {
45 pub async fn open(config: WalConfig, disk: Arc<D>) -> Result<Self> {
46 Self::open_at(config, disk, Lsn::FIRST, SequenceNumber::FIRST).await
47 }
48
49 pub async fn open_at(
50 config: WalConfig,
51 disk: Arc<D>,
52 next_lsn: Lsn,
53 next_sequence: SequenceNumber,
54 ) -> Result<Self> {
55 let wal_dir = RelativePath::new("wal")?;
56 let segments = disk.list_dir(&wal_dir).await?;
57 let active_segment_id = segments
58 .iter()
59 .filter_map(|entry| parse_segment_id(&entry.path))
60 .max()
61 .map(SegmentId)
62 .unwrap_or(SegmentId(1));
63 let active_path = segment_path(active_segment_id)?;
64 let active_len = match disk.file_len(&active_path).await {
65 Ok(len) => len,
66 Err(KayaError::NotFound) => 0,
67 Err(error) => return Err(error),
68 };
69 let batch = WalBatchWriter::new(&config.batch);
70
71 Ok(Self {
72 inner: Arc::new(Mutex::new(WalWriterInner {
73 disk,
74 config,
75 active_segment_id,
76 active_path,
77 active_len,
78 next_lsn,
79 next_sequence,
80 batch,
81 })),
82 })
83 }
84
85 pub async fn append(&self, payload: WalPayload, mode: DurabilityMode) -> Result<AppendResult> {
86 let mut inner = self.inner.lock().await;
87
88 if inner.batch.enabled() && inner.batch.has_pending() && inner.batch.interval_expired() {
89 inner.flush_strict_batch().await?;
90 }
91
92 let record = WalRecord::new(inner.next_lsn, inner.next_sequence, payload);
93 let encoded = encode_record(&record)?;
94 let encoded_len = u32::try_from(encoded.len()).map_err(|_| {
95 KayaError::invalid_argument("encoded WAL record length does not fit into u32")
96 })?;
97 if encoded_len > inner.config.max_record_bytes {
98 return Err(KayaError::invalid_argument(format!(
99 "encoded WAL record exceeds configured max: {encoded_len} > {}",
100 inner.config.max_record_bytes
101 )));
102 }
103
104 if inner.active_len > 0
105 && inner.active_len + u64::from(encoded_len) > inner.config.segment_max_bytes
106 {
107 inner.flush_pending_batch().await?;
108 inner.rotate().await?;
109 }
110
111 let lsn = inner.next_lsn;
112 let sequence = inner.next_sequence;
113 let segment_id = inner.active_segment_id;
114 let offset = inner.disk.append(&inner.active_path, &encoded).await?;
115
116 match mode {
117 DurabilityMode::Relaxed => {
118 inner.active_len = offset + u64::from(encoded_len);
119 inner.next_lsn = inner.next_lsn.next();
120 inner.next_sequence = inner.next_sequence.next();
121 Ok(AppendResult {
122 lsn,
123 sequence,
124 segment_id,
125 offset,
126 encoded_len,
127 durable: false,
128 fsync_duration_us: None,
129 })
130 }
131 DurabilityMode::Strict => {
132 if !inner.batch.enabled() {
133 let fsync_duration_us = inner.flush_strict_batch().await?;
134 inner.active_len = offset + u64::from(encoded_len);
135 inner.next_lsn = inner.next_lsn.next();
136 inner.next_sequence = inner.next_sequence.next();
137 return Ok(AppendResult {
138 lsn,
139 sequence,
140 segment_id,
141 offset,
142 encoded_len,
143 durable: true,
144 fsync_duration_us: Some(fsync_duration_us),
145 });
146 }
147
148 let action = inner.batch.after_record_appended(encoded.len());
149 inner.active_len = offset + u64::from(encoded_len);
150 inner.next_lsn = inner.next_lsn.next();
151 inner.next_sequence = inner.next_sequence.next();
152
153 match action {
154 BatchAction::FlushNow => {
155 let fsync_duration_us = inner.flush_strict_batch().await?;
156 Ok(AppendResult {
157 lsn,
158 sequence,
159 segment_id,
160 offset,
161 encoded_len,
162 durable: true,
163 fsync_duration_us: Some(fsync_duration_us),
164 })
165 }
166 BatchAction::WaitForFlush(rx) => {
167 drop(inner);
168 let fsync_duration_us = rx.await.map_err(|_| {
169 KayaError::internal("WAL batch waiter dropped before group commit")
170 })?;
171 Ok(AppendResult {
172 lsn,
173 sequence,
174 segment_id,
175 offset,
176 encoded_len,
177 durable: true,
178 fsync_duration_us: Some(fsync_duration_us),
179 })
180 }
181 }
182 }
183 }
184 }
185}
186
187impl<D: Disk> WalWriterInner<D> {
188 async fn flush_pending_batch(&mut self) -> Result<()> {
189 if self.batch.has_pending() {
190 self.flush_strict_batch().await?;
191 }
192 Ok(())
193 }
194
195 async fn flush_strict_batch(&mut self) -> Result<u64> {
196 let start = Instant::now();
197 match self.disk.fsync_file(&self.active_path).await {
198 Ok(()) => {
199 let duration_us = start.elapsed().as_micros() as u64;
200 self.batch.complete_flush(duration_us);
201 Ok(duration_us)
202 }
203 Err(error) => {
204 self.batch.fail_flush();
205 Err(error)
206 }
207 }
208 }
209
210 async fn rotate(&mut self) -> Result<()> {
211 self.active_segment_id = SegmentId(self.active_segment_id.0 + 1);
212 self.active_path = segment_path(self.active_segment_id)?;
213 self.active_len = 0;
214 self.disk.fsync_dir(&RelativePath::new("wal")?).await?;
215 Ok(())
216 }
217}
218
219pub fn segment_path(segment_id: SegmentId) -> Result<RelativePath> {
220 RelativePath::new(format!("wal/{:016x}.wal", segment_id.0))
221}
222
223pub fn parse_segment_id(path: &RelativePath) -> Option<u64> {
224 let name = path.file_name()?;
225 let hex = name.strip_suffix(".wal")?;
226 u64::from_str_radix(hex, 16).ok()
227}