1use std::fmt;
2use std::fs::{File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use crate::error::{Result, WalError};
10use crate::{FILE_HEADER_SIZE, NANO_LOG_SIGNATURE};
11
12pub struct Segment {
18 pub(crate) file: Mutex<File>,
19 read_fd: Arc<OwnedFd>,
20 path: PathBuf,
21 expiration_ms: i64,
22 file_size: AtomicU64,
23}
24
25impl Segment {
26 pub(crate) fn create(path: &Path, expiration_ms: i64) -> Result<Self> {
30 let mut file = OpenOptions::new()
31 .create(true)
32 .read(true)
33 .write(true)
34 .truncate(true)
35 .open(path)?;
36
37 let mut header = [0u8; FILE_HEADER_SIZE];
39 header[..8].copy_from_slice(&NANO_LOG_SIGNATURE);
40 header[8..16].copy_from_slice(&expiration_ms.to_le_bytes());
41 file.write_all(&header)?;
42 file.flush()?;
43
44 let read_fd = Arc::new(dup_read_fd(&file)?);
45
46 Ok(Segment {
47 file: Mutex::new(file),
48 read_fd,
49 path: path.to_path_buf(),
50 expiration_ms,
51 file_size: AtomicU64::new(FILE_HEADER_SIZE as u64),
52 })
53 }
54
55 pub(crate) fn open(path: &Path, expected_expiration_ms: i64) -> Result<Self> {
57 let mut file = OpenOptions::new()
58 .read(true)
59 .append(true)
60 .open(path)?;
61
62 let mut header = [0u8; FILE_HEADER_SIZE];
64 file.seek(SeekFrom::Start(0))?;
65 file.read_exact(&mut header).map_err(|e| {
66 WalError::CorruptedData(format!("failed to read segment header: {}", e))
67 })?;
68
69 if header[..8] != NANO_LOG_SIGNATURE {
71 return Err(WalError::CorruptedData(format!(
72 "invalid segment magic: expected NANO-LOG, got {:?}",
73 &header[..8]
74 )));
75 }
76
77 let stored_expiration = i64::from_le_bytes(header[8..16].try_into().unwrap());
79 if stored_expiration != expected_expiration_ms {
80 return Err(WalError::CorruptedData(format!(
81 "expiration mismatch: expected {}, got {}",
82 expected_expiration_ms, stored_expiration
83 )));
84 }
85
86 let file_size = file.seek(SeekFrom::End(0))?;
88
89 let read_fd = Arc::new(dup_read_fd(&file)?);
90
91 Ok(Segment {
92 file: Mutex::new(file),
93 read_fd,
94 path: path.to_path_buf(),
95 expiration_ms: expected_expiration_ms,
96 file_size: AtomicU64::new(file_size),
97 })
98 }
99
100 pub fn read_fd(&self) -> &Arc<OwnedFd> {
102 &self.read_fd
103 }
104
105 pub fn file_size(&self) -> u64 {
107 self.file_size.load(Ordering::Acquire)
108 }
109
110 pub(crate) fn add_file_size(&self, bytes: u64) -> u64 {
112 self.file_size.fetch_add(bytes, Ordering::AcqRel)
113 }
114
115 pub fn expiration_ms(&self) -> i64 {
117 self.expiration_ms
118 }
119
120 pub fn path(&self) -> &Path {
122 &self.path
123 }
124}
125
126impl fmt::Debug for Segment {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("Segment")
129 .field("path", &self.path)
130 .field("expiration_ms", &self.expiration_ms)
131 .field("file_size", &self.file_size.load(Ordering::Relaxed))
132 .finish()
133 }
134}
135
136pub(crate) fn dup_read_fd(file: &File) -> Result<OwnedFd> {
141 let raw = file.as_raw_fd();
142 let duped = unsafe { libc::dup(raw) };
143 if duped < 0 {
144 return Err(WalError::Io(std::io::Error::last_os_error()));
145 }
146
147 #[cfg(target_os = "linux")]
149 {
150 unsafe {
151 libc::posix_fadvise(duped, 0, 0, libc::POSIX_FADV_SEQUENTIAL);
152 }
153 }
154
155 Ok(unsafe { OwnedFd::from_raw_fd(duped) })
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use std::io::Write;
162 use tempfile::TempDir;
163
164 #[test]
165 fn test_create_segment_writes_header() {
166 let dir = TempDir::new().unwrap();
167 let path = dir.path().join("test_12345.seg");
168 let segment = Segment::create(&path, 12345).unwrap();
169
170 assert_eq!(segment.expiration_ms(), 12345);
171 assert_eq!(segment.file_size(), FILE_HEADER_SIZE as u64);
172 assert!(segment.path() == path);
173
174 let data = std::fs::read(&path).unwrap();
175 assert_eq!(&data[0..8], b"NANO-LOG");
176 assert_eq!(i64::from_le_bytes(data[8..16].try_into().unwrap()), 12345);
177 }
178
179 #[test]
180 fn test_read_fd_is_independent() {
181 let dir = TempDir::new().unwrap();
182 let path = dir.path().join("test_99999.seg");
183 let segment = Segment::create(&path, 99999).unwrap();
184
185 {
186 let mut file = segment.file.lock().unwrap();
187 file.write_all(b"extra data").unwrap();
188 file.flush().unwrap();
189 }
190
191 let fd = segment.read_fd();
192 let mut buf = [0u8; 8];
193 let ret = unsafe {
194 libc::pread(
195 std::os::fd::AsRawFd::as_raw_fd(fd.as_ref()),
196 buf.as_mut_ptr() as *mut libc::c_void,
197 8,
198 0,
199 )
200 };
201 assert_eq!(ret, 8);
202 assert_eq!(&buf, b"NANO-LOG");
203 }
204
205 #[test]
206 fn test_reopen_segment() {
207 let dir = TempDir::new().unwrap();
208 let path = dir.path().join("test_55555.seg");
209
210 {
211 let segment = Segment::create(&path, 55555).unwrap();
212 let mut file = segment.file.lock().unwrap();
213 file.write_all(b"payload").unwrap();
214 }
215
216 let segment = Segment::open(&path, 55555).unwrap();
217 assert_eq!(segment.expiration_ms(), 55555);
218 assert_eq!(segment.file_size(), (FILE_HEADER_SIZE + 7) as u64);
219 }
220}