use std::collections::{HashMap, VecDeque};
#[allow(unused_imports)]
use super::functions::*;
use super::functions::{chunk_from_bytes, chunk_to_bytes, delta_decode, delta_encode, fnv1a_hash};
#[derive(Debug, Clone, Default)]
pub struct RleEncoder;
impl RleEncoder {
pub fn encode(data: &[u8]) -> Vec<u8> {
if data.is_empty() {
return Vec::new();
}
let mut out = Vec::new();
let mut i = 0;
while i < data.len() {
let val = data[i];
let mut run = 1usize;
while i + run < data.len() && data[i + run] == val && run < 254 {
run += 1;
}
if run >= 3 {
out.push(0xFF);
out.push(run as u8);
out.push(val);
} else {
for j in 0..run {
out.push(data[i + j]);
}
}
i += run;
}
out
}
pub fn decode(data: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
let mut i = 0;
while i < data.len() {
if data[i] == 0xFF && i + 2 < data.len() {
let count = data[i + 1] as usize;
let val = data[i + 2];
for _ in 0..count {
out.push(val);
}
i += 3;
} else {
out.push(data[i]);
i += 1;
}
}
out
}
}
#[derive(Debug, Clone)]
pub struct TrajFrame {
pub idx: usize,
pub time: f64,
pub positions: Vec<f64>,
pub last_access: u64,
}
#[derive(Debug, Clone)]
pub struct WsFrame {
pub frame_type: WsFrameType,
pub payload: Vec<u8>,
pub fin: bool,
}
#[derive(Debug)]
pub struct ParallelIoCoordinator {
pub writers: Vec<WriterMeta>,
pub output: Vec<u8>,
pub total_size: usize,
}
impl ParallelIoCoordinator {
pub fn new(total_size: usize) -> Self {
Self {
writers: Vec::new(),
output: vec![0u8; total_size],
total_size,
}
}
pub fn register_writer(&mut self, start: usize, end: usize) -> usize {
let id = self.writers.len();
self.writers.push(WriterMeta {
id,
byte_range: (start, end),
bytes_written: 0,
done: false,
});
id
}
pub fn submit(&mut self, id: usize, data: &[u8]) {
if id >= self.writers.len() {
return;
}
let (start, end) = self.writers[id].byte_range;
let len = data.len().min(end.saturating_sub(start));
if start + len <= self.output.len() {
self.output[start..start + len].copy_from_slice(&data[..len]);
}
self.writers[id].bytes_written = len;
self.writers[id].done = true;
}
pub fn all_done(&self) -> bool {
self.writers.iter().all(|w| w.done)
}
pub fn total_written(&self) -> usize {
self.writers.iter().map(|w| w.bytes_written).sum()
}
}
#[derive(Debug, Clone)]
pub struct TrajectorySampler {
pub skip_frames: usize,
pub current: usize,
}
impl TrajectorySampler {
pub fn new(skip_frames: usize) -> Self {
Self {
skip_frames,
current: 0,
}
}
}
#[derive(Debug, Clone)]
pub struct WriterMeta {
pub id: usize,
pub byte_range: (usize, usize),
pub bytes_written: usize,
pub done: bool,
}
#[derive(Debug, Clone)]
pub struct ProtocolFrame {
pub timestamp: f64,
pub seq: u64,
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum IoStatus {
Ready,
Pending,
Error(String),
}
#[derive(Debug, Clone)]
pub struct FrameInterpolator {
pub prev: Vec<f64>,
pub next: Vec<f64>,
pub t_prev: f64,
pub t_next: f64,
pub method: InterpolationMethod,
}
impl FrameInterpolator {
pub fn new(method: InterpolationMethod) -> Self {
Self {
prev: Vec::new(),
next: Vec::new(),
t_prev: 0.0,
t_next: 1.0,
method,
}
}
pub fn update(&mut self, prev: Vec<f64>, t_prev: f64, next: Vec<f64>, t_next: f64) {
self.prev = prev;
self.t_prev = t_prev;
self.next = next;
self.t_next = t_next;
}
pub fn interpolate(&self, t: f64) -> Vec<f64> {
let dt = self.t_next - self.t_prev;
if dt.abs() < 1e-30 {
return self.next.clone();
}
let alpha = ((t - self.t_prev) / dt).clamp(0.0, 1.0);
match self.method {
InterpolationMethod::Linear => self
.prev
.iter()
.zip(self.next.iter())
.map(|(&p, &n)| p + alpha * (n - p))
.collect(),
InterpolationMethod::CubicHermite => {
let h00 = 2.0 * alpha * alpha * alpha - 3.0 * alpha * alpha + 1.0;
let h01 = -2.0 * alpha * alpha * alpha + 3.0 * alpha * alpha;
self.prev
.iter()
.zip(self.next.iter())
.map(|(&p, &n)| h00 * p + h01 * n)
.collect()
}
InterpolationMethod::Nearest => {
if alpha < 0.5 {
self.prev.clone()
} else {
self.next.clone()
}
}
}
}
pub fn interpolation_error(&self, gt: &[f64], t: f64) -> f64 {
let interp = self.interpolate(t);
if gt.len() != interp.len() {
return f64::INFINITY;
}
let sum: f64 = interp
.iter()
.zip(gt.iter())
.map(|(a, b)| (a - b).powi(2))
.sum();
(sum / interp.len().max(1) as f64).sqrt()
}
}
#[derive(Debug, Clone)]
pub struct Packet {
pub dest: String,
pub data: Vec<u8>,
pub seq: u64,
pub latency_ms: f64,
}
#[derive(Debug)]
pub struct PhysicsStreamServer {
pub port: u16,
pub n_clients: usize,
pub frame_queue: VecDeque<ProtocolFrame>,
pub protocol: BinaryProtocol,
pub(super) seq: u64,
pub running: bool,
}
impl PhysicsStreamServer {
pub fn new(port: u16) -> Self {
Self {
port,
n_clients: 0,
frame_queue: VecDeque::new(),
protocol: BinaryProtocol::oxiphysics(),
seq: 0,
running: false,
}
}
pub fn start(&mut self) {
self.running = true;
}
pub fn stop(&mut self) {
self.running = false;
}
pub fn connect_client(&mut self) {
self.n_clients += 1;
}
pub fn disconnect_client(&mut self) {
if self.n_clients > 0 {
self.n_clients -= 1;
}
}
pub fn broadcast(&mut self, timestamp: f64, state: Vec<u8>) {
self.seq += 1;
let frame = ProtocolFrame {
timestamp,
seq: self.seq,
payload: state,
};
self.frame_queue.push_back(frame);
}
pub fn pop_frame(&mut self) -> Option<ProtocolFrame> {
self.frame_queue.pop_front()
}
pub fn queue_depth(&self) -> usize {
self.frame_queue.len()
}
}
#[derive(Debug, Clone)]
pub struct BinaryProtocol {
pub magic: [u8; 4],
pub version: u8,
}
impl BinaryProtocol {
pub fn new(magic: [u8; 4], version: u8) -> Self {
Self { magic, version }
}
pub fn oxiphysics() -> Self {
Self::new(*b"OXIP", 1)
}
pub fn encode(&self, frame: &ProtocolFrame) -> Vec<u8> {
let payload_len = frame.payload.len() as u32;
let mut buf = Vec::new();
buf.extend_from_slice(&self.magic);
buf.push(self.version);
buf.extend_from_slice(&frame.timestamp.to_le_bytes());
buf.extend_from_slice(&frame.seq.to_le_bytes());
buf.extend_from_slice(&payload_len.to_le_bytes());
buf.extend_from_slice(&frame.payload);
buf
}
pub fn decode(&self, data: &[u8]) -> Option<ProtocolFrame> {
if data.len() < 25 {
return None;
}
if data[0..4] != self.magic {
return None;
}
if data[4] != self.version {
return None;
}
let timestamp = f64::from_le_bytes(data[5..13].try_into().ok()?);
let seq = u64::from_le_bytes(data[13..21].try_into().ok()?);
let payload_len = u32::from_le_bytes(data[21..25].try_into().ok()?) as usize;
if data.len() < 25 + payload_len {
return None;
}
let payload = data[25..25 + payload_len].to_vec();
Some(ProtocolFrame {
timestamp,
seq,
payload,
})
}
pub fn header_size(&self) -> usize {
25
}
}
#[derive(Debug)]
pub struct NetworkTransport {
pub latency_ms: f64,
pub packet_loss: f64,
pub send_queue: VecDeque<Packet>,
pub recv_buffer: VecDeque<Packet>,
pub packets_sent: u64,
pub packets_dropped: u64,
pub(super) seq: u64,
}
impl NetworkTransport {
pub fn new(latency_ms: f64, packet_loss: f64) -> Self {
Self {
latency_ms,
packet_loss,
send_queue: VecDeque::new(),
recv_buffer: VecDeque::new(),
packets_sent: 0,
packets_dropped: 0,
seq: 0,
}
}
pub fn send_packet(&mut self, dest: &str, data: Vec<u8>) {
self.seq += 1;
let packet = Packet {
dest: dest.to_string(),
data,
seq: self.seq,
latency_ms: self.latency_ms,
};
if self.seq % 100 < (self.packet_loss * 100.0) as u64 {
self.packets_dropped += 1;
return;
}
self.send_queue.push_back(packet);
self.packets_sent += 1;
}
pub fn deliver(&mut self) {
while let Some(pkt) = self.send_queue.pop_front() {
self.recv_buffer.push_back(pkt);
}
}
pub fn receive_packet(&mut self) -> Option<Packet> {
self.recv_buffer.pop_front()
}
pub fn queue_depth(&self) -> usize {
self.send_queue.len() + self.recv_buffer.len()
}
}
#[derive(Debug)]
pub struct PhysicsStreamClient {
pub port: u16,
pub connected: bool,
pub frames: VecDeque<ProtocolFrame>,
pub protocol: BinaryProtocol,
pub latest_state: Vec<f64>,
pub prev_state: Vec<f64>,
}
impl PhysicsStreamClient {
pub fn new(port: u16) -> Self {
Self {
port,
connected: false,
frames: VecDeque::new(),
protocol: BinaryProtocol::oxiphysics(),
latest_state: Vec::new(),
prev_state: Vec::new(),
}
}
pub fn connect(&mut self) {
self.connected = true;
}
pub fn disconnect(&mut self) {
self.connected = false;
}
pub fn receive_frame(&mut self, frame: ProtocolFrame) {
self.frames.push_back(frame);
}
pub fn decode_latest(&mut self) {
if let Some(frame) = self.frames.back() {
self.prev_state = self.latest_state.clone();
let n = frame.payload.len() / 8;
self.latest_state = (0..n)
.map(|i| {
f64::from_le_bytes(
frame.payload[i * 8..(i + 1) * 8]
.try_into()
.unwrap_or([0u8; 8]),
)
})
.collect();
}
}
pub fn interpolated_state(&self, t: f64) -> Vec<f64> {
if self.prev_state.len() != self.latest_state.len() {
return self.latest_state.clone();
}
self.prev_state
.iter()
.zip(self.latest_state.iter())
.map(|(&p, &l)| p + t * (l - p))
.collect()
}
pub fn buffered_frames(&self) -> usize {
self.frames.len()
}
}
#[derive(Debug)]
pub struct ChunkedFileBuffer {
pub storage: Vec<u8>,
pub chunk_size: usize,
pub read_pos: usize,
pub write_pos: usize,
pub bytes_written: usize,
}
impl ChunkedFileBuffer {
pub fn new(chunk_size: usize) -> Self {
Self {
storage: Vec::new(),
chunk_size,
read_pos: 0,
write_pos: 0,
bytes_written: 0,
}
}
pub fn write_chunk(&mut self, data: &[u8]) {
self.storage.extend_from_slice(data);
self.write_pos += data.len();
self.bytes_written += data.len();
}
pub fn read_chunk(&mut self) -> Option<Vec<u8>> {
if self.read_pos >= self.storage.len() {
return None;
}
let end = (self.read_pos + self.chunk_size).min(self.storage.len());
let chunk = self.storage[self.read_pos..end].to_vec();
self.read_pos = end;
Some(chunk)
}
pub fn available_chunks(&self) -> usize {
let remaining = self.storage.len().saturating_sub(self.read_pos);
(remaining + self.chunk_size - 1) / self.chunk_size.max(1)
}
pub fn reset_read(&mut self) {
self.read_pos = 0;
}
pub fn stored_bytes(&self) -> usize {
self.storage.len()
}
}
#[derive(Debug)]
pub struct ChunkedReader {
pub path: String,
pub total_frames: usize,
pub(super) offsets: Vec<u64>,
}
impl ChunkedReader {
pub fn open(path: &str) -> Result<Self, String> {
let data = std::fs::read(path).map_err(|e| e.to_string())?;
let mut offsets = Vec::new();
let mut pos = 0usize;
while pos + 8 <= data.len() {
let frame_len = u64::from_le_bytes(
data[pos..pos + 8]
.try_into()
.map_err(|_| "bad length field")?,
) as usize;
offsets.push(pos as u64);
pos += 8 + frame_len;
}
let total = offsets.len();
Ok(Self {
path: path.into(),
total_frames: total,
offsets,
})
}
pub fn total_frames(&self) -> usize {
self.total_frames
}
pub fn read_chunk(&self, frame_idx: usize) -> Result<SimulationChunk, String> {
if frame_idx >= self.offsets.len() {
return Err(format!(
"frame_idx {frame_idx} out of range ({})",
self.offsets.len()
));
}
use std::io::{Read, Seek, SeekFrom};
let mut file = std::fs::File::open(&self.path).map_err(|e| e.to_string())?;
file.seek(SeekFrom::Start(self.offsets[frame_idx]))
.map_err(|e| e.to_string())?;
let mut len_buf = [0u8; 8];
file.read_exact(&mut len_buf).map_err(|e| e.to_string())?;
let frame_len = u64::from_le_bytes(len_buf) as usize;
let mut frame_data = vec![0u8; frame_len];
file.read_exact(&mut frame_data)
.map_err(|e| e.to_string())?;
chunk_from_bytes(&frame_data).ok_or_else(|| "corrupt frame data".into())
}
}
#[derive(Debug)]
pub struct AsyncIoStub {
pub buffer: Vec<u8>,
pub pos: usize,
pub pending_ticks: usize,
pub ops_completed: usize,
}
impl AsyncIoStub {
pub fn new(data: Vec<u8>, pending_ticks: usize) -> Self {
Self {
buffer: data,
pos: 0,
pending_ticks,
ops_completed: 0,
}
}
pub fn poll_read(&mut self, len: usize) -> (IoStatus, Vec<u8>) {
if self.pending_ticks > 0 {
self.pending_ticks -= 1;
return (IoStatus::Pending, Vec::new());
}
let end = (self.pos + len).min(self.buffer.len());
let data = self.buffer[self.pos..end].to_vec();
self.pos = end;
self.ops_completed += 1;
(IoStatus::Ready, data)
}
pub fn poll_write(&mut self, data: &[u8]) -> IoStatus {
if self.pending_ticks > 0 {
self.pending_ticks -= 1;
return IoStatus::Pending;
}
self.buffer.extend_from_slice(data);
self.ops_completed += 1;
IoStatus::Ready
}
pub fn blocking_read_all(&mut self, chunk_size: usize) -> Vec<u8> {
let mut out = Vec::new();
loop {
let (status, chunk) = self.poll_read(chunk_size);
if status == IoStatus::Pending {
continue;
}
if chunk.is_empty() {
break;
}
out.extend_from_slice(&chunk);
}
out
}
}
#[derive(Debug)]
pub struct CsvStreamParser {
pub delimiter: char,
pub headers: Vec<String>,
pub rows: Vec<Vec<String>>,
pub has_header: bool,
pub error_count: usize,
}
impl CsvStreamParser {
pub fn new(delimiter: char, has_header: bool) -> Self {
Self {
delimiter,
headers: Vec::new(),
rows: Vec::new(),
has_header,
error_count: 0,
}
}
pub fn feed_line(&mut self, line: &str) {
let line = line.trim();
if line.is_empty() {
return;
}
let fields: Vec<String> = line
.split(self.delimiter)
.map(|s| s.trim().to_string())
.collect();
if self.has_header && self.headers.is_empty() {
self.headers = fields;
} else {
self.rows.push(fields);
}
}
pub fn feed_text(&mut self, text: &str) {
for line in text.lines() {
self.feed_line(line);
}
}
pub fn column_f64(&self, name: &str) -> Vec<f64> {
let col_idx = self.headers.iter().position(|h| h == name);
match col_idx {
None => Vec::new(),
Some(idx) => self
.rows
.iter()
.filter_map(|row| row.get(idx).and_then(|v| v.parse::<f64>().ok()))
.collect(),
}
}
pub fn column_f64_by_idx(&self, idx: usize) -> Vec<f64> {
self.rows
.iter()
.filter_map(|row| row.get(idx).and_then(|v| v.parse::<f64>().ok()))
.collect()
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum WsFrameType {
Text,
Binary,
Ping,
Pong,
Close,
}
#[derive(Debug)]
pub struct WebSocketBridge {
pub outgoing: VecDeque<WsFrame>,
pub incoming: VecDeque<WsFrame>,
pub connected: bool,
pub max_frame_size: usize,
}
impl WebSocketBridge {
pub fn new(max_frame_size: usize) -> Self {
Self {
outgoing: VecDeque::new(),
incoming: VecDeque::new(),
connected: false,
max_frame_size,
}
}
pub fn connect(&mut self) {
self.connected = true;
}
pub fn disconnect(&mut self) {
self.connected = false;
let close = WsFrame {
frame_type: WsFrameType::Close,
payload: Vec::new(),
fin: true,
};
self.outgoing.push_back(close);
}
pub fn send_text(&mut self, text: &str) {
let frame = WsFrame {
frame_type: WsFrameType::Text,
payload: text.as_bytes().to_vec(),
fin: true,
};
self.outgoing.push_back(frame);
}
pub fn send_binary(&mut self, data: Vec<u8>) {
let frame = WsFrame {
frame_type: WsFrameType::Binary,
payload: data,
fin: true,
};
self.outgoing.push_back(frame);
}
pub fn ping(&mut self, data: Vec<u8>) {
let frame = WsFrame {
frame_type: WsFrameType::Ping,
payload: data,
fin: true,
};
self.outgoing.push_back(frame);
}
pub fn recv_frame(&mut self) -> Option<WsFrame> {
self.incoming.pop_front()
}
pub fn inject_frame(&mut self, frame: WsFrame) {
if frame.frame_type == WsFrameType::Ping {
let pong = WsFrame {
frame_type: WsFrameType::Pong,
payload: frame.payload.clone(),
fin: true,
};
self.outgoing.push_back(pong);
}
self.incoming.push_back(frame);
}
pub fn pending_out(&self) -> usize {
self.outgoing.len()
}
}
#[derive(Debug)]
pub struct DataDeduplicator {
pub store: HashMap<u64, Vec<u8>>,
pub bytes_received: usize,
pub bytes_stored: usize,
pub hits: usize,
}
impl DataDeduplicator {
pub fn new() -> Self {
Self {
store: HashMap::new(),
bytes_received: 0,
bytes_stored: 0,
hits: 0,
}
}
pub fn ingest(&mut self, data: &[u8]) -> u64 {
self.bytes_received += data.len();
let hash = fnv1a_hash(data);
if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(hash) {
e.insert(data.to_vec());
self.bytes_stored += data.len();
} else {
self.hits += 1;
}
hash
}
pub fn retrieve(&self, hash: u64) -> Option<&[u8]> {
self.store.get(&hash).map(|v| v.as_slice())
}
pub fn dedup_ratio(&self) -> f64 {
if self.bytes_received == 0 {
return 0.0;
}
1.0 - self.bytes_stored as f64 / self.bytes_received as f64
}
pub fn unique_blocks(&self) -> usize {
self.store.len()
}
}
#[derive(Debug)]
pub struct VtkStreamWriter {
pub output: Vec<u8>,
pub dims: (usize, usize, usize),
pub spacing: (f64, f64, f64),
pub origin: (f64, f64, f64),
pub dataset_name: String,
pub header_written: bool,
pub fields_written: usize,
}
impl VtkStreamWriter {
pub fn new(
dims: (usize, usize, usize),
spacing: (f64, f64, f64),
origin: (f64, f64, f64),
dataset_name: &str,
) -> Self {
Self {
output: Vec::new(),
dims,
spacing,
origin,
dataset_name: dataset_name.to_string(),
header_written: false,
fields_written: 0,
}
}
pub fn write_header(&mut self) {
let header = format!(
"# vtk DataFile Version 3.0\n{}\nASCII\nDATASET STRUCTURED_POINTS\n\
DIMENSIONS {} {} {}\nSPACING {} {} {}\nORIGIN {} {} {}\nPOINT_DATA {}\n",
self.dataset_name,
self.dims.0,
self.dims.1,
self.dims.2,
self.spacing.0,
self.spacing.1,
self.spacing.2,
self.origin.0,
self.origin.1,
self.origin.2,
self.dims.0 * self.dims.1 * self.dims.2,
);
self.output.extend_from_slice(header.as_bytes());
self.header_written = true;
}
pub fn write_scalar_field(&mut self, field_name: &str, data: &[f64]) {
if !self.header_written {
self.write_header();
}
let mut section = format!("SCALARS {} float 1\nLOOKUP_TABLE default\n", field_name);
for &v in data {
section.push_str(&format!("{:.6e} ", v));
}
section.push('\n');
self.output.extend_from_slice(section.as_bytes());
self.fields_written += 1;
}
pub fn as_str(&self) -> &str {
std::str::from_utf8(&self.output).unwrap_or("")
}
pub fn bytes_written(&self) -> usize {
self.output.len()
}
}
#[derive(Debug)]
pub struct StreamingXYZWriter {
pub path: String,
pub frames_written: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum InterpolationMethod {
Linear,
CubicHermite,
Nearest,
}
#[derive(Debug)]
pub struct RingBuffer<T: Clone> {
pub(super) data: VecDeque<T>,
pub(super) capacity: usize,
}
impl<T: Clone> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
Self {
data: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, item: T) -> bool {
if self.data.len() >= self.capacity {
return false;
}
self.data.push_back(item);
true
}
pub fn pop(&mut self) -> Option<T> {
self.data.pop_front()
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn is_full(&self) -> bool {
self.data.len() >= self.capacity
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn peek(&self) -> Option<&T> {
self.data.front()
}
}
#[derive(Debug)]
pub struct BackpressureBuffer<T: Clone> {
pub(super) inner: RingBuffer<T>,
pub high_watermark: f64,
pub low_watermark: f64,
pub backpressure: bool,
pub dropped: usize,
}
impl<T: Clone> BackpressureBuffer<T> {
pub fn new(capacity: usize, high_watermark: f64, low_watermark: f64) -> Self {
Self {
inner: RingBuffer::new(capacity),
high_watermark,
low_watermark,
backpressure: false,
dropped: 0,
}
}
pub fn push(&mut self, item: T) -> bool {
let fill = self.inner.len() as f64 / self.inner.capacity() as f64;
if fill >= self.high_watermark {
self.backpressure = true;
}
if self.backpressure {
self.dropped += 1;
return false;
}
self.inner.push(item)
}
pub fn pop(&mut self) -> Option<T> {
let item = self.inner.pop();
let fill = self.inner.len() as f64 / self.inner.capacity().max(1) as f64;
if fill <= self.low_watermark {
self.backpressure = false;
}
item
}
pub fn fill_fraction(&self) -> f64 {
self.inner.len() as f64 / self.inner.capacity().max(1) as f64
}
pub fn is_backpressure(&self) -> bool {
self.backpressure
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
#[derive(Debug)]
pub struct ChunkedWriter {
pub path: String,
pub frame_count: usize,
pub chunk_size: usize,
}
impl ChunkedWriter {
pub fn new(path: impl Into<String>, chunk_size: usize) -> Self {
Self {
path: path.into(),
frame_count: 0,
chunk_size,
}
}
pub fn write_chunk(&mut self, chunk: &SimulationChunk) -> Result<(), String> {
use std::io::Write as IoWrite;
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.map_err(|e| e.to_string())?;
let mut writer = std::io::BufWriter::new(file);
let data = chunk_to_bytes(chunk);
let len = (data.len() as u64).to_le_bytes();
writer.write_all(&len).map_err(|e| e.to_string())?;
writer.write_all(&data).map_err(|e| e.to_string())?;
self.frame_count += 1;
Ok(())
}
pub fn frame_count(&self) -> usize {
self.frame_count
}
pub fn finalize(&mut self) -> Result<(), String> {
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct CompressionStream;
impl CompressionStream {
pub fn compress_f32(data: &[f32]) -> Vec<u8> {
if data.is_empty() {
return Vec::new();
}
let mut out = Vec::new();
let mut i = 0;
while i < data.len() {
let val = data[i];
let mut run = 1usize;
while i + run < data.len() && data[i + run] == val && run < 255 {
run += 1;
}
if run > 2 {
out.push(0xFF);
out.push(run as u8);
out.extend_from_slice(&val.to_le_bytes());
} else {
for j in 0..run {
out.extend_from_slice(&data[i + j].to_le_bytes());
}
}
i += run;
}
out
}
pub fn decompress_f32(data: &[u8]) -> Vec<f32> {
let mut out = Vec::new();
let mut i = 0;
while i < data.len() {
if i + 4 <= data.len() && data[i] == 0xFF && i + 1 < data.len() {
let count = data[i + 1] as usize;
if i + 6 <= data.len() {
let val =
f32::from_le_bytes([data[i + 2], data[i + 3], data[i + 4], data[i + 5]]);
for _ in 0..count {
out.push(val);
}
i += 6;
continue;
}
}
if i + 4 <= data.len() {
let val = f32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]);
out.push(val);
i += 4;
} else {
break;
}
}
out
}
pub fn compression_ratio(original: &[f32], compressed: &[u8]) -> f64 {
let orig_bytes = original.len() * 4;
if orig_bytes == 0 {
return 1.0;
}
compressed.len() as f64 / orig_bytes as f64
}
}
#[derive(Debug)]
pub struct StreamingXYZReader {
pub path: String,
pub current_frame: usize,
pub n_atoms: usize,
pub buffer: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct SimulationChunk {
pub frame_id: usize,
pub particle_count: usize,
pub positions: Vec<[f32; 3]>,
pub velocities: Vec<[f32; 3]>,
pub time: f64,
}
impl SimulationChunk {
pub fn new(frame_id: usize, time: f64) -> Self {
Self {
frame_id,
particle_count: 0,
positions: Vec::new(),
velocities: Vec::new(),
time,
}
}
pub fn add_particle(&mut self, pos: [f32; 3], vel: [f32; 3]) {
self.positions.push(pos);
self.velocities.push(vel);
self.particle_count += 1;
}
pub fn particle_count(&self) -> usize {
self.particle_count
}
pub fn byte_size(&self) -> usize {
self.particle_count * 2 * 3 * 4 + 24
}
}
#[derive(Debug, Clone)]
pub struct Snapshot {
pub seq: u64,
pub sim_time: f64,
pub delta_payload: Vec<u8>,
pub is_full: bool,
}
#[derive(Debug)]
pub struct CheckpointManager {
pub snapshots: Vec<Snapshot>,
pub full_interval: u64,
pub last_full_idx: Option<usize>,
pub step: u64,
}
impl CheckpointManager {
pub fn full_interval(full_interval: u64) -> Self {
Self {
snapshots: Vec::new(),
full_interval,
last_full_idx: None,
step: 0,
}
}
pub fn record(&mut self, sim_time: f64, state: &[u8], prev_state: &[u8]) {
self.step += 1;
let is_full = self.last_full_idx.is_none() || self.step.is_multiple_of(self.full_interval);
let delta_payload = if is_full {
state.to_vec()
} else {
delta_encode(prev_state, state)
};
let seq = self.step;
let snap = Snapshot {
seq,
sim_time,
delta_payload,
is_full,
};
if is_full {
self.last_full_idx = Some(self.snapshots.len());
}
self.snapshots.push(snap);
}
pub fn restore_latest(&self) -> Option<Vec<u8>> {
if self.snapshots.is_empty() {
return None;
}
let base_idx = self.last_full_idx?;
let mut state = self.snapshots[base_idx].delta_payload.clone();
for snap in &self.snapshots[base_idx + 1..] {
state = delta_decode(&state, &snap.delta_payload);
}
Some(state)
}
pub fn snapshot_count(&self) -> usize {
self.snapshots.len()
}
}
#[derive(Debug)]
pub struct StreamWriter {
pub buffer: Vec<u8>,
pub buffer_size: usize,
pub flush_interval: f64,
pub last_flush: f64,
pub total_written: usize,
pub flushed: Vec<u8>,
}
impl StreamWriter {
pub fn new(buffer_size: usize, flush_interval: f64) -> Self {
Self {
buffer: Vec::with_capacity(buffer_size),
buffer_size,
flush_interval,
last_flush: 0.0,
total_written: 0,
flushed: Vec::new(),
}
}
pub fn write(&mut self, data: &[u8]) {
self.buffer.extend_from_slice(data);
self.total_written += data.len();
if self.buffer.len() >= self.buffer_size {
self.flush();
}
}
pub fn write_f64(&mut self, v: f64) {
self.write(&v.to_le_bytes());
}
pub fn write_u32(&mut self, v: u32) {
self.write(&v.to_le_bytes());
}
pub fn flush(&mut self) {
self.flushed.extend_from_slice(&self.buffer);
self.buffer.clear();
}
pub fn maybe_flush(&mut self, current_time: f64) {
if current_time - self.last_flush >= self.flush_interval {
self.flush();
self.last_flush = current_time;
}
}
pub fn flushed_bytes(&self) -> usize {
self.flushed.len()
}
}
#[derive(Debug)]
pub struct TrajectoryCache {
pub frames: HashMap<usize, TrajFrame>,
pub capacity: usize,
pub(super) access_counter: u64,
pub misses: usize,
pub hits: usize,
pub evictions: usize,
}
impl TrajectoryCache {
pub fn new(capacity: usize) -> Self {
Self {
frames: HashMap::new(),
capacity,
access_counter: 0,
misses: 0,
hits: 0,
evictions: 0,
}
}
pub fn insert(&mut self, idx: usize, time: f64, positions: Vec<f64>) {
if self.frames.contains_key(&idx) {
self.access_counter += 1;
if let Some(f) = self.frames.get_mut(&idx) {
f.last_access = self.access_counter;
f.positions = positions;
f.time = time;
}
return;
}
if self.frames.len() >= self.capacity {
self.evict_lru();
}
self.access_counter += 1;
self.frames.insert(
idx,
TrajFrame {
idx,
time,
positions,
last_access: self.access_counter,
},
);
}
pub fn get(&mut self, idx: usize) -> Option<&TrajFrame> {
if self.frames.contains_key(&idx) {
self.access_counter += 1;
let ac = self.access_counter;
self.frames
.get_mut(&idx)
.expect("key must exist in map")
.last_access = ac;
self.hits += 1;
self.frames.get(&idx)
} else {
self.misses += 1;
None
}
}
fn evict_lru(&mut self) {
if let Some(&evict_idx) = self
.frames
.iter()
.min_by_key(|(_, f)| f.last_access)
.map(|(k, _)| k)
{
self.frames.remove(&evict_idx);
self.evictions += 1;
}
}
pub fn len(&self) -> usize {
self.frames.len()
}
pub fn is_empty(&self) -> bool {
self.frames.is_empty()
}
pub fn hit_ratio(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
return 0.0;
}
self.hits as f64 / total as f64
}
}
#[derive(Debug)]
pub struct StreamReader {
pub data: Vec<u8>,
pub pos: usize,
pub frame_size: usize,
pub frames_read: usize,
}
impl StreamReader {
pub fn new(data: Vec<u8>, frame_size: usize) -> Self {
Self {
data,
pos: 0,
frame_size,
frames_read: 0,
}
}
pub fn read_frame(&mut self) -> Option<Vec<u8>> {
if self.pos + self.frame_size > self.data.len() {
return None;
}
let frame = self.data[self.pos..self.pos + self.frame_size].to_vec();
self.pos += self.frame_size;
self.frames_read += 1;
Some(frame)
}
pub fn read_all<F: FnMut(&[u8])>(&mut self, mut callback: F) {
while let Some(frame) = self.read_frame() {
callback(&frame);
}
}
pub fn seek(&mut self, pos: usize) {
self.pos = pos.min(self.data.len());
}
pub fn remaining(&self) -> usize {
self.data.len().saturating_sub(self.pos)
}
pub fn append_data(&mut self, more: &[u8]) {
self.data.extend_from_slice(more);
}
}