Skip to main content

kaya_wal/
writer.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use kaya_core::{DurabilityMode, KayaError, Lsn, Result, SequenceNumber, WalConfig};
5use kaya_io::{Disk, RelativePath};
6use tokio::sync::Mutex;
7
8use crate::batch::{BatchAction, WalBatchWriter};
9use crate::{encode_record, WalPayload, WalRecord};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
12pub struct SegmentId(pub u64);
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct AppendResult {
16    pub lsn: Lsn,
17    pub sequence: SequenceNumber,
18    pub segment_id: SegmentId,
19    pub offset: u64,
20    pub encoded_len: u32,
21    pub durable: bool,
22    /// Duration of the fsync_file that made this append durable (Strict mode only).
23    /// None for Relaxed or if timing was not captured.
24    pub fsync_duration_us: Option<u64>,
25}
26
27#[derive(Debug)]
28struct WalWriterInner<D: Disk> {
29    disk: Arc<D>,
30    config: WalConfig,
31    active_segment_id: SegmentId,
32    active_path: RelativePath,
33    active_len: u64,
34    next_lsn: Lsn,
35    next_sequence: SequenceNumber,
36    batch: WalBatchWriter,
37}
38
39#[derive(Debug, Clone)]
40pub struct WalWriter<D: Disk> {
41    inner: Arc<Mutex<WalWriterInner<D>>>,
42}
43
44impl<D: Disk> WalWriter<D> {
45    pub async fn open(config: WalConfig, disk: Arc<D>) -> Result<Self> {
46        Self::open_at(config, disk, Lsn::FIRST, SequenceNumber::FIRST).await
47    }
48
49    pub async fn open_at(
50        config: WalConfig,
51        disk: Arc<D>,
52        next_lsn: Lsn,
53        next_sequence: SequenceNumber,
54    ) -> Result<Self> {
55        let wal_dir = RelativePath::new("wal")?;
56        let segments = disk.list_dir(&wal_dir).await?;
57        let active_segment_id = segments
58            .iter()
59            .filter_map(|entry| parse_segment_id(&entry.path))
60            .max()
61            .map(SegmentId)
62            .unwrap_or(SegmentId(1));
63        let active_path = segment_path(active_segment_id)?;
64        let active_len = match disk.file_len(&active_path).await {
65            Ok(len) => len,
66            Err(KayaError::NotFound) => 0,
67            Err(error) => return Err(error),
68        };
69        let batch = WalBatchWriter::new(&config.batch);
70
71        Ok(Self {
72            inner: Arc::new(Mutex::new(WalWriterInner {
73                disk,
74                config,
75                active_segment_id,
76                active_path,
77                active_len,
78                next_lsn,
79                next_sequence,
80                batch,
81            })),
82        })
83    }
84
85    pub async fn append(&self, payload: WalPayload, mode: DurabilityMode) -> Result<AppendResult> {
86        let mut inner = self.inner.lock().await;
87
88        if inner.batch.enabled() && inner.batch.has_pending() && inner.batch.interval_expired() {
89            inner.flush_strict_batch().await?;
90        }
91
92        let record = WalRecord::new(inner.next_lsn, inner.next_sequence, payload);
93        let encoded = encode_record(&record)?;
94        let encoded_len = u32::try_from(encoded.len()).map_err(|_| {
95            KayaError::invalid_argument("encoded WAL record length does not fit into u32")
96        })?;
97        if encoded_len > inner.config.max_record_bytes {
98            return Err(KayaError::invalid_argument(format!(
99                "encoded WAL record exceeds configured max: {encoded_len} > {}",
100                inner.config.max_record_bytes
101            )));
102        }
103
104        if inner.active_len > 0
105            && inner.active_len + u64::from(encoded_len) > inner.config.segment_max_bytes
106        {
107            inner.flush_pending_batch().await?;
108            inner.rotate().await?;
109        }
110
111        let lsn = inner.next_lsn;
112        let sequence = inner.next_sequence;
113        let segment_id = inner.active_segment_id;
114        let offset = inner.disk.append(&inner.active_path, &encoded).await?;
115
116        match mode {
117            DurabilityMode::Relaxed => {
118                inner.active_len = offset + u64::from(encoded_len);
119                inner.next_lsn = inner.next_lsn.next();
120                inner.next_sequence = inner.next_sequence.next();
121                Ok(AppendResult {
122                    lsn,
123                    sequence,
124                    segment_id,
125                    offset,
126                    encoded_len,
127                    durable: false,
128                    fsync_duration_us: None,
129                })
130            }
131            DurabilityMode::Strict => {
132                if !inner.batch.enabled() {
133                    let fsync_duration_us = inner.flush_strict_batch().await?;
134                    inner.active_len = offset + u64::from(encoded_len);
135                    inner.next_lsn = inner.next_lsn.next();
136                    inner.next_sequence = inner.next_sequence.next();
137                    return Ok(AppendResult {
138                        lsn,
139                        sequence,
140                        segment_id,
141                        offset,
142                        encoded_len,
143                        durable: true,
144                        fsync_duration_us: Some(fsync_duration_us),
145                    });
146                }
147
148                let action = inner.batch.after_record_appended(encoded.len());
149                inner.active_len = offset + u64::from(encoded_len);
150                inner.next_lsn = inner.next_lsn.next();
151                inner.next_sequence = inner.next_sequence.next();
152
153                match action {
154                    BatchAction::FlushNow => {
155                        let fsync_duration_us = inner.flush_strict_batch().await?;
156                        Ok(AppendResult {
157                            lsn,
158                            sequence,
159                            segment_id,
160                            offset,
161                            encoded_len,
162                            durable: true,
163                            fsync_duration_us: Some(fsync_duration_us),
164                        })
165                    }
166                    BatchAction::WaitForFlush(rx) => {
167                        drop(inner);
168                        let fsync_duration_us = rx.await.map_err(|_| {
169                            KayaError::internal("WAL batch waiter dropped before group commit")
170                        })?;
171                        Ok(AppendResult {
172                            lsn,
173                            sequence,
174                            segment_id,
175                            offset,
176                            encoded_len,
177                            durable: true,
178                            fsync_duration_us: Some(fsync_duration_us),
179                        })
180                    }
181                }
182            }
183        }
184    }
185}
186
187impl<D: Disk> WalWriterInner<D> {
188    async fn flush_pending_batch(&mut self) -> Result<()> {
189        if self.batch.has_pending() {
190            self.flush_strict_batch().await?;
191        }
192        Ok(())
193    }
194
195    async fn flush_strict_batch(&mut self) -> Result<u64> {
196        let start = Instant::now();
197        match self.disk.fsync_file(&self.active_path).await {
198            Ok(()) => {
199                let duration_us = start.elapsed().as_micros() as u64;
200                self.batch.complete_flush(duration_us);
201                Ok(duration_us)
202            }
203            Err(error) => {
204                self.batch.fail_flush();
205                Err(error)
206            }
207        }
208    }
209
210    async fn rotate(&mut self) -> Result<()> {
211        self.active_segment_id = SegmentId(self.active_segment_id.0 + 1);
212        self.active_path = segment_path(self.active_segment_id)?;
213        self.active_len = 0;
214        self.disk.fsync_dir(&RelativePath::new("wal")?).await?;
215        Ok(())
216    }
217}
218
219pub fn segment_path(segment_id: SegmentId) -> Result<RelativePath> {
220    RelativePath::new(format!("wal/{:016x}.wal", segment_id.0))
221}
222
223pub fn parse_segment_id(path: &RelativePath) -> Option<u64> {
224    let name = path.file_name()?;
225    let hex = name.strip_suffix(".wal")?;
226    u64::from_str_radix(hex, 16).ok()
227}