use std::fmt;
use std::io::{self, Read};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use bytes::{Bytes, BytesMut};
use crate::error::{NetError, NetResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServeStrategy {
Sendfile,
Splice,
VectoredIo,
ChunkedCopy,
SingleAlloc,
}
impl fmt::Display for ServeStrategy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sendfile => f.write_str("sendfile"),
Self::Splice => f.write_str("splice"),
Self::VectoredIo => f.write_str("vectored_io"),
Self::ChunkedCopy => f.write_str("chunked_copy"),
Self::SingleAlloc => f.write_str("single_alloc"),
}
}
}
impl ServeStrategy {
#[must_use]
pub fn best_available() -> Self {
#[cfg(any(target_os = "linux", target_os = "macos"))]
{
Self::Sendfile
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
Self::ChunkedCopy
}
}
}
#[derive(Debug, Clone)]
pub enum SegmentSource {
File(PathBuf),
Memory(Bytes),
}
impl SegmentSource {
#[must_use]
pub fn known_length(&self) -> Option<usize> {
match self {
Self::File(_) => None,
Self::Memory(b) => Some(b.len()),
}
}
}
#[derive(Debug, Clone)]
pub struct TransferStats {
pub bytes: u64,
pub elapsed: Duration,
pub strategy: ServeStrategy,
pub syscall_count: u32,
}
impl TransferStats {
#[must_use]
pub fn throughput_mib_s(&self) -> f64 {
let secs = self.elapsed.as_secs_f64();
if secs < 1e-9 {
return 0.0;
}
(self.bytes as f64) / (secs * 1024.0 * 1024.0)
}
}
impl fmt::Display for TransferStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} bytes via {} in {:.1}ms ({:.1} MiB/s, {} syscalls)",
self.bytes,
self.strategy,
self.elapsed.as_secs_f64() * 1000.0,
self.throughput_mib_s(),
self.syscall_count,
)
}
}
pub const DEFAULT_CHUNK_SIZE: usize = 65_536;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub preferred_strategy: ServeStrategy,
pub chunk_size: usize,
pub max_segment_bytes: u64,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
preferred_strategy: ServeStrategy::best_available(),
chunk_size: DEFAULT_CHUNK_SIZE,
max_segment_bytes: 64 * 1024 * 1024, }
}
}
impl ServerConfig {
pub fn validate(&self) -> NetResult<()> {
if self.chunk_size < 512 {
return Err(NetError::protocol(format!(
"chunk_size must be >= 512, got {}",
self.chunk_size
)));
}
if self.max_segment_bytes == 0 {
return Err(NetError::protocol("max_segment_bytes must be > 0"));
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ServeResult {
pub data: Bytes,
pub stats: TransferStats,
}
#[derive(Debug, Clone)]
pub struct SegmentServer {
config: ServerConfig,
}
impl SegmentServer {
pub fn new(config: ServerConfig) -> NetResult<Self> {
config.validate()?;
Ok(Self { config })
}
pub fn with_defaults() -> NetResult<Self> {
Self::new(ServerConfig::default())
}
#[must_use]
pub const fn config(&self) -> &ServerConfig {
&self.config
}
pub fn serve(&self, source: SegmentSource) -> NetResult<ServeResult> {
match source {
SegmentSource::Memory(bytes) => self.serve_memory(bytes),
SegmentSource::File(path) => self.serve_file(&path),
}
}
fn serve_memory(&self, bytes: Bytes) -> NetResult<ServeResult> {
if bytes.len() as u64 > self.config.max_segment_bytes {
return Err(NetError::buffer(format!(
"segment {} B exceeds limit {} B",
bytes.len(),
self.config.max_segment_bytes
)));
}
let start = Instant::now();
let len = bytes.len() as u64;
Ok(ServeResult {
data: bytes,
stats: TransferStats {
bytes: len,
elapsed: start.elapsed(),
strategy: ServeStrategy::SingleAlloc,
syscall_count: 0,
},
})
}
fn serve_file(&self, path: &Path) -> NetResult<ServeResult> {
let start = Instant::now();
let strategy = self.config.preferred_strategy;
let metadata = std::fs::metadata(path).map_err(|e| {
NetError::Io(io::Error::new(
e.kind(),
format!("stat {}: {e}", path.display()),
))
})?;
let file_size = metadata.len();
if file_size > self.config.max_segment_bytes {
return Err(NetError::buffer(format!(
"file {} B exceeds limit {} B",
file_size, self.config.max_segment_bytes
)));
}
let mut file = std::fs::File::open(path).map_err(|e| {
NetError::Io(io::Error::new(
e.kind(),
format!("open {}: {e}", path.display()),
))
})?;
let mut buf = BytesMut::with_capacity(file_size as usize);
let chunk_size = self.config.chunk_size;
let mut tmp = vec![0u8; chunk_size];
let mut syscalls: u32 = 0;
loop {
let n = file.read(&mut tmp).map_err(|e| NetError::Io(e))?;
syscalls += 1;
if n == 0 {
break;
}
buf.extend_from_slice(&tmp[..n]);
}
let bytes_transferred = buf.len() as u64;
Ok(ServeResult {
data: buf.freeze(),
stats: TransferStats {
bytes: bytes_transferred,
elapsed: start.elapsed(),
strategy,
syscall_count: syscalls,
},
})
}
}
#[derive(Debug)]
pub struct SegmentCache {
entries: std::collections::VecDeque<CacheEntry>,
max_entries: usize,
}
#[derive(Debug, Clone)]
struct CacheEntry {
key: String,
data: Bytes,
}
impl SegmentCache {
pub fn new(max_entries: usize) -> NetResult<Self> {
if max_entries == 0 {
return Err(NetError::protocol("max_entries must be > 0"));
}
Ok(Self {
entries: std::collections::VecDeque::with_capacity(max_entries),
max_entries,
})
}
pub fn insert(&mut self, key: impl Into<String>, data: Bytes) {
let key = key.into();
self.entries.retain(|e| e.key != key);
if self.entries.len() >= self.max_entries {
self.entries.pop_front();
}
self.entries.push_back(CacheEntry { key, data });
}
#[must_use]
pub fn get(&self, key: &str) -> Option<Bytes> {
self.entries
.iter()
.find(|e| e.key == key)
.map(|e| e.data.clone())
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn clear(&mut self) {
self.entries.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn write_tmp_file(content: &[u8]) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(format!("oximedia_zcs_test_{}.ts", std::process::id()));
let mut f = std::fs::File::create(&path).expect("create tmp");
f.write_all(content).expect("write tmp");
path
}
#[test]
fn test_serve_memory_roundtrip() {
let server = SegmentServer::with_defaults().expect("server");
let payload = Bytes::from_static(b"HLS segment data");
let result = server
.serve(SegmentSource::Memory(payload.clone()))
.expect("serve");
assert_eq!(result.data, payload);
}
#[test]
fn test_serve_memory_strategy() {
let server = SegmentServer::with_defaults().expect("server");
let result = server
.serve(SegmentSource::Memory(Bytes::from_static(b"x")))
.expect("serve");
assert_eq!(result.stats.strategy, ServeStrategy::SingleAlloc);
}
#[test]
fn test_serve_memory_size_limit() {
let cfg = ServerConfig {
max_segment_bytes: 4,
..Default::default()
};
let server = SegmentServer::new(cfg).expect("server");
let big = Bytes::from(vec![0u8; 5]);
assert!(server.serve(SegmentSource::Memory(big)).is_err());
}
#[test]
fn test_serve_file_roundtrip() {
let content = b"MPEG-TS segment payload";
let path = write_tmp_file(content);
let server = SegmentServer::with_defaults().expect("server");
let result = server
.serve(SegmentSource::File(path.clone()))
.expect("serve");
assert_eq!(result.data.as_ref(), content);
let _ = std::fs::remove_file(path);
}
#[test]
fn test_serve_file_byte_count() {
let content = vec![0xFFu8; 1024];
let path = write_tmp_file(&content);
let server = SegmentServer::with_defaults().expect("server");
let result = server
.serve(SegmentSource::File(path.clone()))
.expect("serve");
assert_eq!(result.stats.bytes, 1024);
let _ = std::fs::remove_file(path);
}
#[test]
fn test_serve_missing_file() {
let server = SegmentServer::with_defaults().expect("server");
let src = SegmentSource::File(PathBuf::from("/nonexistent/segment.ts"));
assert!(server.serve(src).is_err());
}
#[test]
fn test_config_validation_chunk_size() {
let cfg = ServerConfig {
chunk_size: 100,
..Default::default()
};
assert!(cfg.validate().is_err());
}
#[test]
fn test_config_validation_max_bytes_zero() {
let cfg = ServerConfig {
max_segment_bytes: 0,
..Default::default()
};
assert!(cfg.validate().is_err());
}
#[test]
fn test_cache_insert_get() {
let mut cache = SegmentCache::new(4).expect("cache");
cache.insert("seg0.ts", Bytes::from_static(b"data0"));
let got = cache.get("seg0.ts").expect("hit");
assert_eq!(got.as_ref(), b"data0");
}
#[test]
fn test_cache_miss() {
let cache = SegmentCache::new(4).expect("cache");
assert!(cache.get("missing.ts").is_none());
}
#[test]
fn test_cache_eviction() {
let mut cache = SegmentCache::new(2).expect("cache");
cache.insert("seg0.ts", Bytes::from_static(b"0"));
cache.insert("seg1.ts", Bytes::from_static(b"1"));
cache.insert("seg2.ts", Bytes::from_static(b"2")); assert!(cache.get("seg0.ts").is_none(), "seg0 should be evicted");
assert!(cache.get("seg1.ts").is_some());
assert!(cache.get("seg2.ts").is_some());
}
#[test]
fn test_transfer_stats_throughput() {
let stats = TransferStats {
bytes: 1024 * 1024, elapsed: Duration::from_secs(1),
strategy: ServeStrategy::ChunkedCopy,
syscall_count: 16,
};
let t = stats.throughput_mib_s();
assert!((t - 1.0).abs() < 0.01, "throughput should be ~1.0 MiB/s");
}
#[test]
fn test_best_available_strategy() {
let s = ServeStrategy::best_available();
let s_str = format!("{s}");
assert!(!s_str.is_empty());
}
#[test]
fn test_segment_source_known_length() {
let mem = SegmentSource::Memory(Bytes::from_static(b"hello"));
assert_eq!(mem.known_length(), Some(5));
let file = SegmentSource::File(std::env::temp_dir().join("oximedia-net-zcopy-test.ts"));
assert!(file.known_length().is_none());
}
#[test]
fn test_cache_clear() {
let mut cache = SegmentCache::new(4).expect("cache");
cache.insert("a", Bytes::from_static(b"1"));
cache.insert("b", Bytes::from_static(b"2"));
cache.clear();
assert!(cache.is_empty());
assert_eq!(cache.len(), 0);
}
}