use std::fmt;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use crate::error::{Result, WalError};
use crate::{FILE_HEADER_SIZE, NANO_LOG_SIGNATURE};
pub struct Segment {
pub(crate) file: Mutex<File>,
read_fd: Arc<OwnedFd>,
path: PathBuf,
expiration_ms: i64,
file_size: AtomicU64,
}
impl Segment {
pub(crate) fn create(path: &Path, expiration_ms: i64) -> Result<Self> {
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(true)
.open(path)?;
let mut header = [0u8; FILE_HEADER_SIZE];
header[..8].copy_from_slice(&NANO_LOG_SIGNATURE);
header[8..16].copy_from_slice(&expiration_ms.to_le_bytes());
file.write_all(&header)?;
file.flush()?;
let read_fd = Arc::new(dup_read_fd(&file)?);
Ok(Segment {
file: Mutex::new(file),
read_fd,
path: path.to_path_buf(),
expiration_ms,
file_size: AtomicU64::new(FILE_HEADER_SIZE as u64),
})
}
pub(crate) fn open(path: &Path, expected_expiration_ms: i64) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.append(true)
.open(path)?;
let mut header = [0u8; FILE_HEADER_SIZE];
file.seek(SeekFrom::Start(0))?;
file.read_exact(&mut header).map_err(|e| {
WalError::CorruptedData(format!("failed to read segment header: {}", e))
})?;
if header[..8] != NANO_LOG_SIGNATURE {
return Err(WalError::CorruptedData(format!(
"invalid segment magic: expected NANO-LOG, got {:?}",
&header[..8]
)));
}
let stored_expiration = i64::from_le_bytes(header[8..16].try_into().unwrap());
if stored_expiration != expected_expiration_ms {
return Err(WalError::CorruptedData(format!(
"expiration mismatch: expected {}, got {}",
expected_expiration_ms, stored_expiration
)));
}
let file_size = file.seek(SeekFrom::End(0))?;
let read_fd = Arc::new(dup_read_fd(&file)?);
Ok(Segment {
file: Mutex::new(file),
read_fd,
path: path.to_path_buf(),
expiration_ms: expected_expiration_ms,
file_size: AtomicU64::new(file_size),
})
}
pub fn read_fd(&self) -> &Arc<OwnedFd> {
&self.read_fd
}
pub fn file_size(&self) -> u64 {
self.file_size.load(Ordering::Acquire)
}
pub(crate) fn add_file_size(&self, bytes: u64) -> u64 {
self.file_size.fetch_add(bytes, Ordering::AcqRel)
}
pub fn expiration_ms(&self) -> i64 {
self.expiration_ms
}
pub fn path(&self) -> &Path {
&self.path
}
}
impl fmt::Debug for Segment {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Segment")
.field("path", &self.path)
.field("expiration_ms", &self.expiration_ms)
.field("file_size", &self.file_size.load(Ordering::Relaxed))
.finish()
}
}
pub(crate) fn dup_read_fd(file: &File) -> Result<OwnedFd> {
let raw = file.as_raw_fd();
let duped = unsafe { libc::dup(raw) };
if duped < 0 {
return Err(WalError::Io(std::io::Error::last_os_error()));
}
#[cfg(target_os = "linux")]
{
unsafe {
libc::posix_fadvise(duped, 0, 0, libc::POSIX_FADV_SEQUENTIAL);
}
}
Ok(unsafe { OwnedFd::from_raw_fd(duped) })
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::TempDir;
#[test]
fn test_create_segment_writes_header() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test_12345.seg");
let segment = Segment::create(&path, 12345).unwrap();
assert_eq!(segment.expiration_ms(), 12345);
assert_eq!(segment.file_size(), FILE_HEADER_SIZE as u64);
assert!(segment.path() == path);
let data = std::fs::read(&path).unwrap();
assert_eq!(&data[0..8], b"NANO-LOG");
assert_eq!(i64::from_le_bytes(data[8..16].try_into().unwrap()), 12345);
}
#[test]
fn test_read_fd_is_independent() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test_99999.seg");
let segment = Segment::create(&path, 99999).unwrap();
{
let mut file = segment.file.lock().unwrap();
file.write_all(b"extra data").unwrap();
file.flush().unwrap();
}
let fd = segment.read_fd();
let mut buf = [0u8; 8];
let ret = unsafe {
libc::pread(
std::os::fd::AsRawFd::as_raw_fd(fd.as_ref()),
buf.as_mut_ptr() as *mut libc::c_void,
8,
0,
)
};
assert_eq!(ret, 8);
assert_eq!(&buf, b"NANO-LOG");
}
#[test]
fn test_reopen_segment() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test_55555.seg");
{
let segment = Segment::create(&path, 55555).unwrap();
let mut file = segment.file.lock().unwrap();
file.write_all(b"payload").unwrap();
}
let segment = Segment::open(&path, 55555).unwrap();
assert_eq!(segment.expiration_ms(), 55555);
assert_eq!(segment.file_size(), (FILE_HEADER_SIZE + 7) as u64);
}
}