Skip to main content

oxiphysics_io/streaming_io/
types.rs

1//! Auto-generated module
2//!
3//! 🤖 Generated with [SplitRS](https://github.com/cool-japan/splitrs)
4
5use std::collections::{HashMap, VecDeque};
6
7#[allow(unused_imports)]
8use super::functions::*;
9use super::functions::{chunk_from_bytes, chunk_to_bytes, delta_decode, delta_encode, fnv1a_hash};
10
11/// Run-length encoder/decoder for byte streams (trajectory data).
12///
13/// Format: repeated runs are encoded as `[0xFF, count, value]`; literals are
14/// passed through.  This mirrors the f32 variant in `CompressionStream` but
15/// operates on raw bytes.
16#[derive(Debug, Clone, Default)]
17pub struct RleEncoder;
18impl RleEncoder {
19    /// Encode a byte slice using RLE.
20    pub fn encode(data: &[u8]) -> Vec<u8> {
21        if data.is_empty() {
22            return Vec::new();
23        }
24        let mut out = Vec::new();
25        let mut i = 0;
26        while i < data.len() {
27            let val = data[i];
28            let mut run = 1usize;
29            while i + run < data.len() && data[i + run] == val && run < 254 {
30                run += 1;
31            }
32            if run >= 3 {
33                out.push(0xFF);
34                out.push(run as u8);
35                out.push(val);
36            } else {
37                for j in 0..run {
38                    out.push(data[i + j]);
39                }
40            }
41            i += run;
42        }
43        out
44    }
45    /// Decode a byte slice produced by `encode`.
46    pub fn decode(data: &[u8]) -> Vec<u8> {
47        let mut out = Vec::new();
48        let mut i = 0;
49        while i < data.len() {
50            if data[i] == 0xFF && i + 2 < data.len() {
51                let count = data[i + 1] as usize;
52                let val = data[i + 2];
53                for _ in 0..count {
54                    out.push(val);
55                }
56                i += 3;
57            } else {
58                out.push(data[i]);
59                i += 1;
60            }
61        }
62        out
63    }
64}
65/// A cached trajectory frame.
66#[derive(Debug, Clone)]
67pub struct TrajFrame {
68    /// Frame index.
69    pub idx: usize,
70    /// Simulation time.
71    pub time: f64,
72    /// Position data (flat: N particles × 3).
73    pub positions: Vec<f64>,
74    /// Last access count (used for LRU).
75    pub last_access: u64,
76}
77/// WebSocket frame.
78#[derive(Debug, Clone)]
79pub struct WsFrame {
80    /// Frame type.
81    pub frame_type: WsFrameType,
82    /// Payload data.
83    pub payload: Vec<u8>,
84    /// Final fragment flag.
85    pub fin: bool,
86}
87/// Coordinates multiple parallel I/O writers that write non-overlapping data
88/// regions to a shared output (e.g. a parallel file system or collective MPI
89/// write).
90#[derive(Debug)]
91pub struct ParallelIoCoordinator {
92    /// All registered writers.
93    pub writers: Vec<WriterMeta>,
94    /// Total output buffer (simulates shared storage).
95    pub output: Vec<u8>,
96    /// Total size of the output file.
97    pub total_size: usize,
98}
99impl ParallelIoCoordinator {
100    /// Create a new parallel I/O coordinator.
101    pub fn new(total_size: usize) -> Self {
102        Self {
103            writers: Vec::new(),
104            output: vec![0u8; total_size],
105            total_size,
106        }
107    }
108    /// Register a writer responsible for bytes `[start, end)`.
109    ///
110    /// Returns the writer ID.
111    pub fn register_writer(&mut self, start: usize, end: usize) -> usize {
112        let id = self.writers.len();
113        self.writers.push(WriterMeta {
114            id,
115            byte_range: (start, end),
116            bytes_written: 0,
117            done: false,
118        });
119        id
120    }
121    /// Submit data from writer `id`.
122    ///
123    /// Writes `data` at the registered offset.  Marks the writer as done.
124    pub fn submit(&mut self, id: usize, data: &[u8]) {
125        if id >= self.writers.len() {
126            return;
127        }
128        let (start, end) = self.writers[id].byte_range;
129        let len = data.len().min(end.saturating_sub(start));
130        if start + len <= self.output.len() {
131            self.output[start..start + len].copy_from_slice(&data[..len]);
132        }
133        self.writers[id].bytes_written = len;
134        self.writers[id].done = true;
135    }
136    /// Returns `true` when all writers have submitted their data.
137    pub fn all_done(&self) -> bool {
138        self.writers.iter().all(|w| w.done)
139    }
140    /// Total bytes written across all writers.
141    pub fn total_written(&self) -> usize {
142        self.writers.iter().map(|w| w.bytes_written).sum()
143    }
144}
145/// Controls frame sampling from a trajectory — saves every `skip_frames+1`-th frame.
146#[derive(Debug, Clone)]
147pub struct TrajectorySampler {
148    /// Number of frames to skip between sampled frames (0 = keep every frame).
149    pub skip_frames: usize,
150    /// Internal counter tracking how many frames have been seen since last sample.
151    pub current: usize,
152}
153impl TrajectorySampler {
154    /// Create a new sampler that retains every `(skip_frames+1)`-th frame.
155    pub fn new(skip_frames: usize) -> Self {
156        Self {
157            skip_frames,
158            current: 0,
159        }
160    }
161}
162/// Metadata for a parallel I/O writer.
163#[derive(Debug, Clone)]
164pub struct WriterMeta {
165    /// Writer identifier.
166    pub id: usize,
167    /// Data range this writer is responsible for (byte offsets).
168    pub byte_range: (usize, usize),
169    /// Bytes written by this writer.
170    pub bytes_written: usize,
171    /// Whether this writer has finished.
172    pub done: bool,
173}
174/// Binary protocol frame.
175#[derive(Debug, Clone)]
176pub struct ProtocolFrame {
177    /// Frame timestamp.
178    pub timestamp: f64,
179    /// Frame sequence number.
180    pub seq: u64,
181    /// Payload data.
182    pub payload: Vec<u8>,
183}
184/// Status of a non-blocking I/O operation.
185#[derive(Debug, Clone, PartialEq)]
186pub enum IoStatus {
187    /// Operation completed successfully.
188    Ready,
189    /// Operation is pending (would block).
190    Pending,
191    /// An error occurred.
192    Error(String),
193}
194/// Frame interpolator for smooth rendering between physics states.
195#[derive(Debug, Clone)]
196pub struct FrameInterpolator {
197    /// Previous frame state.
198    pub prev: Vec<f64>,
199    /// Next frame state.
200    pub next: Vec<f64>,
201    /// Previous frame timestamp.
202    pub t_prev: f64,
203    /// Next frame timestamp.
204    pub t_next: f64,
205    /// Interpolation method.
206    pub method: InterpolationMethod,
207}
208impl FrameInterpolator {
209    /// Create a new frame interpolator.
210    pub fn new(method: InterpolationMethod) -> Self {
211        Self {
212            prev: Vec::new(),
213            next: Vec::new(),
214            t_prev: 0.0,
215            t_next: 1.0,
216            method,
217        }
218    }
219    /// Update with new frame pair.
220    pub fn update(&mut self, prev: Vec<f64>, t_prev: f64, next: Vec<f64>, t_next: f64) {
221        self.prev = prev;
222        self.t_prev = t_prev;
223        self.next = next;
224        self.t_next = t_next;
225    }
226    /// Interpolate at time t.
227    pub fn interpolate(&self, t: f64) -> Vec<f64> {
228        let dt = self.t_next - self.t_prev;
229        if dt.abs() < 1e-30 {
230            return self.next.clone();
231        }
232        let alpha = ((t - self.t_prev) / dt).clamp(0.0, 1.0);
233        match self.method {
234            InterpolationMethod::Linear => self
235                .prev
236                .iter()
237                .zip(self.next.iter())
238                .map(|(&p, &n)| p + alpha * (n - p))
239                .collect(),
240            InterpolationMethod::CubicHermite => {
241                let h00 = 2.0 * alpha * alpha * alpha - 3.0 * alpha * alpha + 1.0;
242                let h01 = -2.0 * alpha * alpha * alpha + 3.0 * alpha * alpha;
243                self.prev
244                    .iter()
245                    .zip(self.next.iter())
246                    .map(|(&p, &n)| h00 * p + h01 * n)
247                    .collect()
248            }
249            InterpolationMethod::Nearest => {
250                if alpha < 0.5 {
251                    self.prev.clone()
252                } else {
253                    self.next.clone()
254                }
255            }
256        }
257    }
258    /// Error estimate between interpolated and ground truth.
259    pub fn interpolation_error(&self, gt: &[f64], t: f64) -> f64 {
260        let interp = self.interpolate(t);
261        if gt.len() != interp.len() {
262            return f64::INFINITY;
263        }
264        let sum: f64 = interp
265            .iter()
266            .zip(gt.iter())
267            .map(|(a, b)| (a - b).powi(2))
268            .sum();
269        (sum / interp.len().max(1) as f64).sqrt()
270    }
271}
272/// Mock packet for network transport.
273#[derive(Debug, Clone)]
274pub struct Packet {
275    /// Destination address (mock).
276    pub dest: String,
277    /// Data payload.
278    pub data: Vec<u8>,
279    /// Sequence number.
280    pub seq: u64,
281    /// Simulated latency in ms.
282    pub latency_ms: f64,
283}
284/// Mock physics stream server.
285///
286/// Serves physics state frames to connected mock clients.
287#[derive(Debug)]
288pub struct PhysicsStreamServer {
289    /// Port (mock).
290    pub port: u16,
291    /// Number of connected clients.
292    pub n_clients: usize,
293    /// Frame queue for broadcasting.
294    pub frame_queue: VecDeque<ProtocolFrame>,
295    /// Protocol encoder.
296    pub protocol: BinaryProtocol,
297    /// Sequence counter.
298    pub(super) seq: u64,
299    /// Running flag.
300    pub running: bool,
301}
302impl PhysicsStreamServer {
303    /// Create a new physics stream server.
304    pub fn new(port: u16) -> Self {
305        Self {
306            port,
307            n_clients: 0,
308            frame_queue: VecDeque::new(),
309            protocol: BinaryProtocol::oxiphysics(),
310            seq: 0,
311            running: false,
312        }
313    }
314    /// Start serving.
315    pub fn start(&mut self) {
316        self.running = true;
317    }
318    /// Stop serving.
319    pub fn stop(&mut self) {
320        self.running = false;
321    }
322    /// Connect a mock client.
323    pub fn connect_client(&mut self) {
324        self.n_clients += 1;
325    }
326    /// Disconnect a mock client.
327    pub fn disconnect_client(&mut self) {
328        if self.n_clients > 0 {
329            self.n_clients -= 1;
330        }
331    }
332    /// Broadcast physics state to all clients.
333    pub fn broadcast(&mut self, timestamp: f64, state: Vec<u8>) {
334        self.seq += 1;
335        let frame = ProtocolFrame {
336            timestamp,
337            seq: self.seq,
338            payload: state,
339        };
340        self.frame_queue.push_back(frame);
341    }
342    /// Get next frame from queue.
343    pub fn pop_frame(&mut self) -> Option<ProtocolFrame> {
344        self.frame_queue.pop_front()
345    }
346    /// Queue depth.
347    pub fn queue_depth(&self) -> usize {
348        self.frame_queue.len()
349    }
350}
351/// Physics state binary protocol.
352///
353/// Header: magic (4 bytes), version (1 byte), timestamp (8 bytes f64).
354/// Body: data payload.
355#[derive(Debug, Clone)]
356pub struct BinaryProtocol {
357    /// Magic bytes.
358    pub magic: [u8; 4],
359    /// Protocol version.
360    pub version: u8,
361}
362impl BinaryProtocol {
363    /// Create a new binary protocol.
364    pub fn new(magic: [u8; 4], version: u8) -> Self {
365        Self { magic, version }
366    }
367    /// Default physics protocol (magic: OXIP).
368    pub fn oxiphysics() -> Self {
369        Self::new(*b"OXIP", 1)
370    }
371    /// Encode a frame.
372    pub fn encode(&self, frame: &ProtocolFrame) -> Vec<u8> {
373        let payload_len = frame.payload.len() as u32;
374        let mut buf = Vec::new();
375        buf.extend_from_slice(&self.magic);
376        buf.push(self.version);
377        buf.extend_from_slice(&frame.timestamp.to_le_bytes());
378        buf.extend_from_slice(&frame.seq.to_le_bytes());
379        buf.extend_from_slice(&payload_len.to_le_bytes());
380        buf.extend_from_slice(&frame.payload);
381        buf
382    }
383    /// Decode a frame.
384    pub fn decode(&self, data: &[u8]) -> Option<ProtocolFrame> {
385        if data.len() < 25 {
386            return None;
387        }
388        if data[0..4] != self.magic {
389            return None;
390        }
391        if data[4] != self.version {
392            return None;
393        }
394        let timestamp = f64::from_le_bytes(data[5..13].try_into().ok()?);
395        let seq = u64::from_le_bytes(data[13..21].try_into().ok()?);
396        let payload_len = u32::from_le_bytes(data[21..25].try_into().ok()?) as usize;
397        if data.len() < 25 + payload_len {
398            return None;
399        }
400        let payload = data[25..25 + payload_len].to_vec();
401        Some(ProtocolFrame {
402            timestamp,
403            seq,
404            payload,
405        })
406    }
407    /// Frame overhead size (header bytes).
408    pub fn header_size(&self) -> usize {
409        25
410    }
411}
412/// Mock TCP/UDP transport with latency simulation.
413#[derive(Debug)]
414pub struct NetworkTransport {
415    /// Simulated latency in ms.
416    pub latency_ms: f64,
417    /// Packet loss rate (0.0..1.0).
418    pub packet_loss: f64,
419    /// Outgoing packet queue.
420    pub send_queue: VecDeque<Packet>,
421    /// Incoming packet buffer.
422    pub recv_buffer: VecDeque<Packet>,
423    /// Total packets sent.
424    pub packets_sent: u64,
425    /// Total packets dropped.
426    pub packets_dropped: u64,
427    /// Sequence counter.
428    pub(super) seq: u64,
429}
430impl NetworkTransport {
431    /// Create a new network transport.
432    pub fn new(latency_ms: f64, packet_loss: f64) -> Self {
433        Self {
434            latency_ms,
435            packet_loss,
436            send_queue: VecDeque::new(),
437            recv_buffer: VecDeque::new(),
438            packets_sent: 0,
439            packets_dropped: 0,
440            seq: 0,
441        }
442    }
443    /// Send a packet (enqueue).
444    pub fn send_packet(&mut self, dest: &str, data: Vec<u8>) {
445        self.seq += 1;
446        let packet = Packet {
447            dest: dest.to_string(),
448            data,
449            seq: self.seq,
450            latency_ms: self.latency_ms,
451        };
452        if self.seq % 100 < (self.packet_loss * 100.0) as u64 {
453            self.packets_dropped += 1;
454            return;
455        }
456        self.send_queue.push_back(packet);
457        self.packets_sent += 1;
458    }
459    /// Deliver all queued packets to recv_buffer (simulates delivery).
460    pub fn deliver(&mut self) {
461        while let Some(pkt) = self.send_queue.pop_front() {
462            self.recv_buffer.push_back(pkt);
463        }
464    }
465    /// Receive next packet.
466    pub fn receive_packet(&mut self) -> Option<Packet> {
467        self.recv_buffer.pop_front()
468    }
469    /// Queue depth.
470    pub fn queue_depth(&self) -> usize {
471        self.send_queue.len() + self.recv_buffer.len()
472    }
473}
474/// Mock physics stream client.
475///
476/// Connects to a mock server, receives frames, decodes, and interpolates.
477#[derive(Debug)]
478pub struct PhysicsStreamClient {
479    /// Server port.
480    pub port: u16,
481    /// Connected flag.
482    pub connected: bool,
483    /// Received frames buffer.
484    pub frames: VecDeque<ProtocolFrame>,
485    /// Protocol decoder.
486    pub protocol: BinaryProtocol,
487    /// Latest decoded state.
488    pub latest_state: Vec<f64>,
489    /// Previous state for interpolation.
490    pub prev_state: Vec<f64>,
491}
492impl PhysicsStreamClient {
493    /// Create a new physics stream client.
494    pub fn new(port: u16) -> Self {
495        Self {
496            port,
497            connected: false,
498            frames: VecDeque::new(),
499            protocol: BinaryProtocol::oxiphysics(),
500            latest_state: Vec::new(),
501            prev_state: Vec::new(),
502        }
503    }
504    /// Connect to server.
505    pub fn connect(&mut self) {
506        self.connected = true;
507    }
508    /// Disconnect.
509    pub fn disconnect(&mut self) {
510        self.connected = false;
511    }
512    /// Inject a frame (from server).
513    pub fn receive_frame(&mut self, frame: ProtocolFrame) {
514        self.frames.push_back(frame);
515    }
516    /// Decode the latest frame.
517    pub fn decode_latest(&mut self) {
518        if let Some(frame) = self.frames.back() {
519            self.prev_state = self.latest_state.clone();
520            let n = frame.payload.len() / 8;
521            self.latest_state = (0..n)
522                .map(|i| {
523                    f64::from_le_bytes(
524                        frame.payload[i * 8..(i + 1) * 8]
525                            .try_into()
526                            .unwrap_or([0u8; 8]),
527                    )
528                })
529                .collect();
530        }
531    }
532    /// Get interpolated state at t in \[0, 1\].
533    pub fn interpolated_state(&self, t: f64) -> Vec<f64> {
534        if self.prev_state.len() != self.latest_state.len() {
535            return self.latest_state.clone();
536        }
537        self.prev_state
538            .iter()
539            .zip(self.latest_state.iter())
540            .map(|(&p, &l)| p + t * (l - p))
541            .collect()
542    }
543    /// Number of buffered frames.
544    pub fn buffered_frames(&self) -> usize {
545        self.frames.len()
546    }
547}
548/// Chunked file reader/writer that processes data in fixed-size blocks.
549///
550/// Suitable for large datasets that exceed available memory.  Both reading
551/// and writing are abstracted as in-memory operations (no actual file I/O in
552/// this mock implementation).
553#[derive(Debug)]
554pub struct ChunkedFileBuffer {
555    /// Underlying byte storage (simulates a file).
556    pub storage: Vec<u8>,
557    /// Chunk size in bytes.
558    pub chunk_size: usize,
559    /// Current read position.
560    pub read_pos: usize,
561    /// Current write position.
562    pub write_pos: usize,
563    /// Total bytes written.
564    pub bytes_written: usize,
565}
566impl ChunkedFileBuffer {
567    /// Create a new chunked buffer with the given chunk size.
568    pub fn new(chunk_size: usize) -> Self {
569        Self {
570            storage: Vec::new(),
571            chunk_size,
572            read_pos: 0,
573            write_pos: 0,
574            bytes_written: 0,
575        }
576    }
577    /// Write a chunk of data to the buffer.
578    pub fn write_chunk(&mut self, data: &[u8]) {
579        self.storage.extend_from_slice(data);
580        self.write_pos += data.len();
581        self.bytes_written += data.len();
582    }
583    /// Read the next chunk. Returns `None` if no more data.
584    pub fn read_chunk(&mut self) -> Option<Vec<u8>> {
585        if self.read_pos >= self.storage.len() {
586            return None;
587        }
588        let end = (self.read_pos + self.chunk_size).min(self.storage.len());
589        let chunk = self.storage[self.read_pos..end].to_vec();
590        self.read_pos = end;
591        Some(chunk)
592    }
593    /// Number of full chunks available for reading.
594    pub fn available_chunks(&self) -> usize {
595        let remaining = self.storage.len().saturating_sub(self.read_pos);
596        (remaining + self.chunk_size - 1) / self.chunk_size.max(1)
597    }
598    /// Reset read cursor to beginning.
599    pub fn reset_read(&mut self) {
600        self.read_pos = 0;
601    }
602    /// Total bytes stored.
603    pub fn stored_bytes(&self) -> usize {
604        self.storage.len()
605    }
606}
607/// Streaming binary reader for files written by [`ChunkedWriter`].
608#[derive(Debug)]
609pub struct ChunkedReader {
610    /// Path to the data file.
611    pub path: String,
612    /// Total number of frames discovered during [`open`](ChunkedReader::open).
613    pub total_frames: usize,
614    /// Byte offsets for each frame.
615    pub(super) offsets: Vec<u64>,
616}
617impl ChunkedReader {
618    /// Open the file and index all frames.
619    pub fn open(path: &str) -> Result<Self, String> {
620        let data = std::fs::read(path).map_err(|e| e.to_string())?;
621        let mut offsets = Vec::new();
622        let mut pos = 0usize;
623        while pos + 8 <= data.len() {
624            let frame_len = u64::from_le_bytes(
625                data[pos..pos + 8]
626                    .try_into()
627                    .map_err(|_| "bad length field")?,
628            ) as usize;
629            offsets.push(pos as u64);
630            pos += 8 + frame_len;
631        }
632        let total = offsets.len();
633        Ok(Self {
634            path: path.into(),
635            total_frames: total,
636            offsets,
637        })
638    }
639    /// Total number of frames.
640    pub fn total_frames(&self) -> usize {
641        self.total_frames
642    }
643    /// Read the frame with the given sequential index.
644    pub fn read_chunk(&self, frame_idx: usize) -> Result<SimulationChunk, String> {
645        if frame_idx >= self.offsets.len() {
646            return Err(format!(
647                "frame_idx {frame_idx} out of range ({})",
648                self.offsets.len()
649            ));
650        }
651        use std::io::{Read, Seek, SeekFrom};
652        let mut file = std::fs::File::open(&self.path).map_err(|e| e.to_string())?;
653        file.seek(SeekFrom::Start(self.offsets[frame_idx]))
654            .map_err(|e| e.to_string())?;
655        let mut len_buf = [0u8; 8];
656        file.read_exact(&mut len_buf).map_err(|e| e.to_string())?;
657        let frame_len = u64::from_le_bytes(len_buf) as usize;
658        let mut frame_data = vec![0u8; frame_len];
659        file.read_exact(&mut frame_data)
660            .map_err(|e| e.to_string())?;
661        chunk_from_bytes(&frame_data).ok_or_else(|| "corrupt frame data".into())
662    }
663}
664/// A non-blocking (async-compatible) read/write stub that simulates
665/// waker-based I/O without requiring an async runtime.
666///
667/// In a real implementation this would wrap `tokio::fs::File` or similar.
668#[derive(Debug)]
669pub struct AsyncIoStub {
670    /// Internal buffer.
671    pub buffer: Vec<u8>,
672    /// Current read position.
673    pub pos: usize,
674    /// Simulated readiness delay: ops will return `Pending` this many times
675    /// before becoming `Ready`.
676    pub pending_ticks: usize,
677    /// Count of operations completed.
678    pub ops_completed: usize,
679}
680impl AsyncIoStub {
681    /// Create a new async I/O stub with an initial payload.
682    pub fn new(data: Vec<u8>, pending_ticks: usize) -> Self {
683        Self {
684            buffer: data,
685            pos: 0,
686            pending_ticks,
687            ops_completed: 0,
688        }
689    }
690    /// Non-blocking read of up to `len` bytes.
691    ///
692    /// Returns `Pending` while `pending_ticks > 0`, `Ready` once drained.
693    pub fn poll_read(&mut self, len: usize) -> (IoStatus, Vec<u8>) {
694        if self.pending_ticks > 0 {
695            self.pending_ticks -= 1;
696            return (IoStatus::Pending, Vec::new());
697        }
698        let end = (self.pos + len).min(self.buffer.len());
699        let data = self.buffer[self.pos..end].to_vec();
700        self.pos = end;
701        self.ops_completed += 1;
702        (IoStatus::Ready, data)
703    }
704    /// Non-blocking write.
705    ///
706    /// Returns `Pending` while `pending_ticks > 0`.
707    pub fn poll_write(&mut self, data: &[u8]) -> IoStatus {
708        if self.pending_ticks > 0 {
709            self.pending_ticks -= 1;
710            return IoStatus::Pending;
711        }
712        self.buffer.extend_from_slice(data);
713        self.ops_completed += 1;
714        IoStatus::Ready
715    }
716    /// Drive the stub until `poll_read` returns `Ready`, collecting all bytes.
717    pub fn blocking_read_all(&mut self, chunk_size: usize) -> Vec<u8> {
718        let mut out = Vec::new();
719        loop {
720            let (status, chunk) = self.poll_read(chunk_size);
721            if status == IoStatus::Pending {
722                continue;
723            }
724            if chunk.is_empty() {
725                break;
726            }
727            out.extend_from_slice(&chunk);
728        }
729        out
730    }
731}
732/// A streaming CSV parser that processes data line by line.
733///
734/// Supports optional header rows, delimiter configuration, and type-safe
735/// extraction of f64 columns.
736#[derive(Debug)]
737pub struct CsvStreamParser {
738    /// Column delimiter character.
739    pub delimiter: char,
740    /// Column headers (populated if the first row is a header).
741    pub headers: Vec<String>,
742    /// Parsed rows (each row is a Vec of raw string values).
743    pub rows: Vec<Vec<String>>,
744    /// Whether the first non-empty line should be treated as a header.
745    pub has_header: bool,
746    /// Number of parse errors encountered.
747    pub error_count: usize,
748}
749impl CsvStreamParser {
750    /// Create a new CSV parser.
751    pub fn new(delimiter: char, has_header: bool) -> Self {
752        Self {
753            delimiter,
754            headers: Vec::new(),
755            rows: Vec::new(),
756            has_header,
757            error_count: 0,
758        }
759    }
760    /// Feed a single line of text to the parser.
761    pub fn feed_line(&mut self, line: &str) {
762        let line = line.trim();
763        if line.is_empty() {
764            return;
765        }
766        let fields: Vec<String> = line
767            .split(self.delimiter)
768            .map(|s| s.trim().to_string())
769            .collect();
770        if self.has_header && self.headers.is_empty() {
771            self.headers = fields;
772        } else {
773            self.rows.push(fields);
774        }
775    }
776    /// Feed multiple lines of text.
777    pub fn feed_text(&mut self, text: &str) {
778        for line in text.lines() {
779            self.feed_line(line);
780        }
781    }
782    /// Extract a column by name as f64 values.  Returns empty if header not found.
783    pub fn column_f64(&self, name: &str) -> Vec<f64> {
784        let col_idx = self.headers.iter().position(|h| h == name);
785        match col_idx {
786            None => Vec::new(),
787            Some(idx) => self
788                .rows
789                .iter()
790                .filter_map(|row| row.get(idx).and_then(|v| v.parse::<f64>().ok()))
791                .collect(),
792        }
793    }
794    /// Extract a column by index as f64 values.
795    pub fn column_f64_by_idx(&self, idx: usize) -> Vec<f64> {
796        self.rows
797            .iter()
798            .filter_map(|row| row.get(idx).and_then(|v| v.parse::<f64>().ok()))
799            .collect()
800    }
801    /// Number of parsed data rows (excluding header).
802    pub fn row_count(&self) -> usize {
803        self.rows.len()
804    }
805}
806/// WebSocket message type.
807#[derive(Debug, Clone, PartialEq)]
808pub enum WsFrameType {
809    /// Text frame.
810    Text,
811    /// Binary frame.
812    Binary,
813    /// Ping frame.
814    Ping,
815    /// Pong frame.
816    Pong,
817    /// Close frame.
818    Close,
819}
820/// WebSocket bridge for text/binary frame framing.
821#[derive(Debug)]
822pub struct WebSocketBridge {
823    /// Buffered outgoing frames.
824    pub outgoing: VecDeque<WsFrame>,
825    /// Buffered incoming frames.
826    pub incoming: VecDeque<WsFrame>,
827    /// Connection state.
828    pub connected: bool,
829    /// Max frame size.
830    pub max_frame_size: usize,
831}
832impl WebSocketBridge {
833    /// Create a new WebSocket bridge.
834    pub fn new(max_frame_size: usize) -> Self {
835        Self {
836            outgoing: VecDeque::new(),
837            incoming: VecDeque::new(),
838            connected: false,
839            max_frame_size,
840        }
841    }
842    /// Connect.
843    pub fn connect(&mut self) {
844        self.connected = true;
845    }
846    /// Disconnect.
847    pub fn disconnect(&mut self) {
848        self.connected = false;
849        let close = WsFrame {
850            frame_type: WsFrameType::Close,
851            payload: Vec::new(),
852            fin: true,
853        };
854        self.outgoing.push_back(close);
855    }
856    /// Send text frame.
857    pub fn send_text(&mut self, text: &str) {
858        let frame = WsFrame {
859            frame_type: WsFrameType::Text,
860            payload: text.as_bytes().to_vec(),
861            fin: true,
862        };
863        self.outgoing.push_back(frame);
864    }
865    /// Send binary frame.
866    pub fn send_binary(&mut self, data: Vec<u8>) {
867        let frame = WsFrame {
868            frame_type: WsFrameType::Binary,
869            payload: data,
870            fin: true,
871        };
872        self.outgoing.push_back(frame);
873    }
874    /// Send ping.
875    pub fn ping(&mut self, data: Vec<u8>) {
876        let frame = WsFrame {
877            frame_type: WsFrameType::Ping,
878            payload: data,
879            fin: true,
880        };
881        self.outgoing.push_back(frame);
882    }
883    /// Receive next frame.
884    pub fn recv_frame(&mut self) -> Option<WsFrame> {
885        self.incoming.pop_front()
886    }
887    /// Inject an incoming frame (for testing).
888    pub fn inject_frame(&mut self, frame: WsFrame) {
889        if frame.frame_type == WsFrameType::Ping {
890            let pong = WsFrame {
891                frame_type: WsFrameType::Pong,
892                payload: frame.payload.clone(),
893                fin: true,
894            };
895            self.outgoing.push_back(pong);
896        }
897        self.incoming.push_back(frame);
898    }
899    /// Number of pending outgoing frames.
900    pub fn pending_out(&self) -> usize {
901        self.outgoing.len()
902    }
903}
904/// A simple content-addressed deduplication store.
905///
906/// Stores unique blocks identified by a 64-bit hash (FNV-1a). Duplicate
907/// blocks are not stored twice; the caller receives back the canonical hash.
908#[derive(Debug)]
909pub struct DataDeduplicator {
910    /// Canonical block store: hash → data.
911    pub store: HashMap<u64, Vec<u8>>,
912    /// Total bytes received (pre-dedup).
913    pub bytes_received: usize,
914    /// Total bytes actually stored (post-dedup).
915    pub bytes_stored: usize,
916    /// Dedup hit count.
917    pub hits: usize,
918}
919impl DataDeduplicator {
920    /// Create a new deduplicator.
921    pub fn new() -> Self {
922        Self {
923            store: HashMap::new(),
924            bytes_received: 0,
925            bytes_stored: 0,
926            hits: 0,
927        }
928    }
929    /// Ingest a block of data.  Returns the content hash.
930    ///
931    /// If the block is new, it is stored.  If it already exists, the existing
932    /// hash is returned and `hits` is incremented.
933    pub fn ingest(&mut self, data: &[u8]) -> u64 {
934        self.bytes_received += data.len();
935        let hash = fnv1a_hash(data);
936        if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(hash) {
937            e.insert(data.to_vec());
938            self.bytes_stored += data.len();
939        } else {
940            self.hits += 1;
941        }
942        hash
943    }
944    /// Retrieve a block by hash.
945    pub fn retrieve(&self, hash: u64) -> Option<&[u8]> {
946        self.store.get(&hash).map(|v| v.as_slice())
947    }
948    /// Deduplication ratio (1 - stored/received).
949    pub fn dedup_ratio(&self) -> f64 {
950        if self.bytes_received == 0 {
951            return 0.0;
952        }
953        1.0 - self.bytes_stored as f64 / self.bytes_received as f64
954    }
955    /// Number of unique blocks stored.
956    pub fn unique_blocks(&self) -> usize {
957        self.store.len()
958    }
959}
960/// A streaming writer for VTK legacy format (ASCII structured points).
961///
962/// Builds VTK output incrementally as physics data is produced, without
963/// buffering the entire dataset in memory.
964#[derive(Debug)]
965pub struct VtkStreamWriter {
966    /// Output buffer (simulates file I/O).
967    pub output: Vec<u8>,
968    /// Grid dimensions (nx, ny, nz).
969    pub dims: (usize, usize, usize),
970    /// Grid spacing (dx, dy, dz).
971    pub spacing: (f64, f64, f64),
972    /// Origin (ox, oy, oz).
973    pub origin: (f64, f64, f64),
974    /// Dataset name.
975    pub dataset_name: String,
976    /// Whether the header has been written.
977    pub header_written: bool,
978    /// Number of scalar fields written.
979    pub fields_written: usize,
980}
981impl VtkStreamWriter {
982    /// Create a new VTK stream writer.
983    pub fn new(
984        dims: (usize, usize, usize),
985        spacing: (f64, f64, f64),
986        origin: (f64, f64, f64),
987        dataset_name: &str,
988    ) -> Self {
989        Self {
990            output: Vec::new(),
991            dims,
992            spacing,
993            origin,
994            dataset_name: dataset_name.to_string(),
995            header_written: false,
996            fields_written: 0,
997        }
998    }
999    /// Write the VTK file header (must be called before any field).
1000    pub fn write_header(&mut self) {
1001        let header = format!(
1002            "# vtk DataFile Version 3.0\n{}\nASCII\nDATASET STRUCTURED_POINTS\n\
1003             DIMENSIONS {} {} {}\nSPACING {} {} {}\nORIGIN {} {} {}\nPOINT_DATA {}\n",
1004            self.dataset_name,
1005            self.dims.0,
1006            self.dims.1,
1007            self.dims.2,
1008            self.spacing.0,
1009            self.spacing.1,
1010            self.spacing.2,
1011            self.origin.0,
1012            self.origin.1,
1013            self.origin.2,
1014            self.dims.0 * self.dims.1 * self.dims.2,
1015        );
1016        self.output.extend_from_slice(header.as_bytes());
1017        self.header_written = true;
1018    }
1019    /// Write a scalar field with the given name and data.
1020    ///
1021    /// Must call `write_header` first.
1022    pub fn write_scalar_field(&mut self, field_name: &str, data: &[f64]) {
1023        if !self.header_written {
1024            self.write_header();
1025        }
1026        let mut section = format!("SCALARS {} float 1\nLOOKUP_TABLE default\n", field_name);
1027        for &v in data {
1028            section.push_str(&format!("{:.6e} ", v));
1029        }
1030        section.push('\n');
1031        self.output.extend_from_slice(section.as_bytes());
1032        self.fields_written += 1;
1033    }
1034    /// Return a string view of the accumulated output.
1035    pub fn as_str(&self) -> &str {
1036        std::str::from_utf8(&self.output).unwrap_or("")
1037    }
1038    /// Total bytes written.
1039    pub fn bytes_written(&self) -> usize {
1040        self.output.len()
1041    }
1042}
1043/// Streaming XYZ writer that appends frames one at a time.
1044#[derive(Debug)]
1045pub struct StreamingXYZWriter {
1046    /// Path to the output XYZ file.
1047    pub path: String,
1048    /// Number of frames written so far.
1049    pub frames_written: usize,
1050}
1051/// Interpolation method for frame rendering.
1052#[derive(Debug, Clone, Copy, PartialEq)]
1053pub enum InterpolationMethod {
1054    /// Linear interpolation.
1055    Linear,
1056    /// Cubic Hermite spline.
1057    CubicHermite,
1058    /// Nearest neighbor.
1059    Nearest,
1060}
1061/// Lock-free single-producer single-consumer ring buffer.
1062///
1063/// Supports push, pop, capacity, and len operations.
1064#[derive(Debug)]
1065pub struct RingBuffer<T: Clone> {
1066    /// Internal storage.
1067    pub(super) data: VecDeque<T>,
1068    /// Maximum capacity.
1069    pub(super) capacity: usize,
1070}
1071impl<T: Clone> RingBuffer<T> {
1072    /// Create a new ring buffer with given capacity.
1073    pub fn new(capacity: usize) -> Self {
1074        Self {
1075            data: VecDeque::with_capacity(capacity),
1076            capacity,
1077        }
1078    }
1079    /// Push an item. Returns true on success, false if full.
1080    pub fn push(&mut self, item: T) -> bool {
1081        if self.data.len() >= self.capacity {
1082            return false;
1083        }
1084        self.data.push_back(item);
1085        true
1086    }
1087    /// Pop an item. Returns None if empty.
1088    pub fn pop(&mut self) -> Option<T> {
1089        self.data.pop_front()
1090    }
1091    /// Current number of items.
1092    pub fn len(&self) -> usize {
1093        self.data.len()
1094    }
1095    /// Is the buffer empty?
1096    pub fn is_empty(&self) -> bool {
1097        self.data.is_empty()
1098    }
1099    /// Is the buffer full?
1100    pub fn is_full(&self) -> bool {
1101        self.data.len() >= self.capacity
1102    }
1103    /// Buffer capacity.
1104    pub fn capacity(&self) -> usize {
1105        self.capacity
1106    }
1107    /// Peek at front item without removing.
1108    pub fn peek(&self) -> Option<&T> {
1109        self.data.front()
1110    }
1111}
1112/// A bounded ring buffer that signals backpressure when it becomes too full.
1113///
1114/// The `high_watermark` fraction (e.g. 0.8) determines when backpressure is
1115/// activated; the `low_watermark` (e.g. 0.2) determines when it is released.
1116#[derive(Debug)]
1117pub struct BackpressureBuffer<T: Clone> {
1118    /// Internal ring buffer.
1119    pub(super) inner: RingBuffer<T>,
1120    /// Fraction of capacity at which backpressure is asserted (0.0–1.0).
1121    pub high_watermark: f64,
1122    /// Fraction of capacity at which backpressure is released (0.0–1.0).
1123    pub low_watermark: f64,
1124    /// Current backpressure state.
1125    pub backpressure: bool,
1126    /// Total items dropped due to backpressure.
1127    pub dropped: usize,
1128}
1129impl<T: Clone> BackpressureBuffer<T> {
1130    /// Create a new backpressure-aware buffer.
1131    pub fn new(capacity: usize, high_watermark: f64, low_watermark: f64) -> Self {
1132        Self {
1133            inner: RingBuffer::new(capacity),
1134            high_watermark,
1135            low_watermark,
1136            backpressure: false,
1137            dropped: 0,
1138        }
1139    }
1140    /// Try to push an item.  Returns `false` and increments `dropped` if
1141    /// backpressure is active or the buffer is full.
1142    pub fn push(&mut self, item: T) -> bool {
1143        let fill = self.inner.len() as f64 / self.inner.capacity() as f64;
1144        if fill >= self.high_watermark {
1145            self.backpressure = true;
1146        }
1147        if self.backpressure {
1148            self.dropped += 1;
1149            return false;
1150        }
1151        self.inner.push(item)
1152    }
1153    /// Pop an item.  May release backpressure if below `low_watermark`.
1154    pub fn pop(&mut self) -> Option<T> {
1155        let item = self.inner.pop();
1156        let fill = self.inner.len() as f64 / self.inner.capacity().max(1) as f64;
1157        if fill <= self.low_watermark {
1158            self.backpressure = false;
1159        }
1160        item
1161    }
1162    /// Current fill fraction (0.0–1.0).
1163    pub fn fill_fraction(&self) -> f64 {
1164        self.inner.len() as f64 / self.inner.capacity().max(1) as f64
1165    }
1166    /// Whether backpressure is currently active.
1167    pub fn is_backpressure(&self) -> bool {
1168        self.backpressure
1169    }
1170    /// Number of items in the buffer.
1171    pub fn len(&self) -> usize {
1172        self.inner.len()
1173    }
1174    /// Is the buffer empty?
1175    pub fn is_empty(&self) -> bool {
1176        self.inner.is_empty()
1177    }
1178}
1179/// Streaming binary writer for large simulation data sets.
1180///
1181/// Each call to [`write_chunk`](ChunkedWriter::write_chunk) appends a serialised
1182/// `SimulationChunk` to the output file.
1183#[derive(Debug)]
1184pub struct ChunkedWriter {
1185    /// Path to the output file.
1186    pub path: String,
1187    /// Number of frames written so far.
1188    pub frame_count: usize,
1189    /// Maximum particles per chunk (advisory; not enforced).
1190    pub chunk_size: usize,
1191}
1192impl ChunkedWriter {
1193    /// Create a new writer targeting `path` with advisory `chunk_size`.
1194    pub fn new(path: impl Into<String>, chunk_size: usize) -> Self {
1195        Self {
1196            path: path.into(),
1197            frame_count: 0,
1198            chunk_size,
1199        }
1200    }
1201    /// Append `chunk` to the file.
1202    ///
1203    /// Returns an error string on I/O failure.
1204    pub fn write_chunk(&mut self, chunk: &SimulationChunk) -> Result<(), String> {
1205        use std::io::Write as IoWrite;
1206        let file = std::fs::OpenOptions::new()
1207            .create(true)
1208            .append(true)
1209            .open(&self.path)
1210            .map_err(|e| e.to_string())?;
1211        let mut writer = std::io::BufWriter::new(file);
1212        let data = chunk_to_bytes(chunk);
1213        let len = (data.len() as u64).to_le_bytes();
1214        writer.write_all(&len).map_err(|e| e.to_string())?;
1215        writer.write_all(&data).map_err(|e| e.to_string())?;
1216        self.frame_count += 1;
1217        Ok(())
1218    }
1219    /// Number of frames written.
1220    pub fn frame_count(&self) -> usize {
1221        self.frame_count
1222    }
1223    /// Finalise the file (no-op in this implementation — flush is automatic).
1224    pub fn finalize(&mut self) -> Result<(), String> {
1225        Ok(())
1226    }
1227}
1228/// Lightweight LZ4-style run-length encoding for float arrays.
1229///
1230/// Encodes repeated identical values efficiently.
1231#[derive(Debug, Clone, Default)]
1232pub struct CompressionStream;
1233impl CompressionStream {
1234    /// Compress a slice of f32 values using run-length encoding.
1235    pub fn compress_f32(data: &[f32]) -> Vec<u8> {
1236        if data.is_empty() {
1237            return Vec::new();
1238        }
1239        let mut out = Vec::new();
1240        let mut i = 0;
1241        while i < data.len() {
1242            let val = data[i];
1243            let mut run = 1usize;
1244            while i + run < data.len() && data[i + run] == val && run < 255 {
1245                run += 1;
1246            }
1247            if run > 2 {
1248                out.push(0xFF);
1249                out.push(run as u8);
1250                out.extend_from_slice(&val.to_le_bytes());
1251            } else {
1252                for j in 0..run {
1253                    out.extend_from_slice(&data[i + j].to_le_bytes());
1254                }
1255            }
1256            i += run;
1257        }
1258        out
1259    }
1260    /// Decompress f32 values.
1261    pub fn decompress_f32(data: &[u8]) -> Vec<f32> {
1262        let mut out = Vec::new();
1263        let mut i = 0;
1264        while i < data.len() {
1265            if i + 4 <= data.len() && data[i] == 0xFF && i + 1 < data.len() {
1266                let count = data[i + 1] as usize;
1267                if i + 6 <= data.len() {
1268                    let val =
1269                        f32::from_le_bytes([data[i + 2], data[i + 3], data[i + 4], data[i + 5]]);
1270                    for _ in 0..count {
1271                        out.push(val);
1272                    }
1273                    i += 6;
1274                    continue;
1275                }
1276            }
1277            if i + 4 <= data.len() {
1278                let val = f32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]);
1279                out.push(val);
1280                i += 4;
1281            } else {
1282                break;
1283            }
1284        }
1285        out
1286    }
1287    /// Compression ratio (compressed / original).
1288    pub fn compression_ratio(original: &[f32], compressed: &[u8]) -> f64 {
1289        let orig_bytes = original.len() * 4;
1290        if orig_bytes == 0 {
1291            return 1.0;
1292        }
1293        compressed.len() as f64 / orig_bytes as f64
1294    }
1295}
1296/// Frame-by-frame streaming reader for XYZ trajectory files.
1297///
1298/// Reads lazily so that large files are not loaded into memory at once.
1299#[derive(Debug)]
1300pub struct StreamingXYZReader {
1301    /// Path to the XYZ file.
1302    pub path: String,
1303    /// Index of the next frame to be read (0-based).
1304    pub current_frame: usize,
1305    /// Number of atoms per frame (detected on first read).
1306    pub n_atoms: usize,
1307    /// Internal line buffer.
1308    pub buffer: Vec<u8>,
1309}
1310/// One frame of simulation data: particle positions, velocities, and metadata.
1311#[derive(Debug, Clone)]
1312pub struct SimulationChunk {
1313    /// Sequential frame identifier.
1314    pub frame_id: usize,
1315    /// Number of particles in this frame.
1316    pub particle_count: usize,
1317    /// Particle positions `[x, y, z]` in metres.
1318    pub positions: Vec<[f32; 3]>,
1319    /// Particle velocities `[vx, vy, vz]` in m/s.
1320    pub velocities: Vec<[f32; 3]>,
1321    /// Simulation time of this frame in seconds.
1322    pub time: f64,
1323}
1324impl SimulationChunk {
1325    /// Create a new empty chunk for `frame_id` at simulation time `time`.
1326    pub fn new(frame_id: usize, time: f64) -> Self {
1327        Self {
1328            frame_id,
1329            particle_count: 0,
1330            positions: Vec::new(),
1331            velocities: Vec::new(),
1332            time,
1333        }
1334    }
1335    /// Add a particle with position `pos` and velocity `vel`.
1336    pub fn add_particle(&mut self, pos: [f32; 3], vel: [f32; 3]) {
1337        self.positions.push(pos);
1338        self.velocities.push(vel);
1339        self.particle_count += 1;
1340    }
1341    /// Number of particles.
1342    pub fn particle_count(&self) -> usize {
1343        self.particle_count
1344    }
1345    /// Approximate byte size of this chunk (positions + velocities + header).
1346    pub fn byte_size(&self) -> usize {
1347        self.particle_count * 2 * 3 * 4 + 24
1348    }
1349}
1350/// An incremental snapshot record for checkpoint/restart.
1351#[derive(Debug, Clone)]
1352pub struct Snapshot {
1353    /// Snapshot sequence number.
1354    pub seq: u64,
1355    /// Simulation time at snapshot.
1356    pub sim_time: f64,
1357    /// Delta-encoded payload (diff from previous snapshot).
1358    pub delta_payload: Vec<u8>,
1359    /// Whether this is a full (base) snapshot.
1360    pub is_full: bool,
1361}
1362/// Manages incremental checkpoint/restart for a streaming simulation.
1363///
1364/// Full snapshots are stored periodically; incremental deltas reduce I/O cost.
1365#[derive(Debug)]
1366pub struct CheckpointManager {
1367    /// Stored snapshots (base + incremental).
1368    pub snapshots: Vec<Snapshot>,
1369    /// Interval (in steps) between full snapshots.
1370    pub full_interval: u64,
1371    /// Last full snapshot index in `snapshots`.
1372    pub last_full_idx: Option<usize>,
1373    /// Current step counter.
1374    pub step: u64,
1375}
1376impl CheckpointManager {
1377    /// Create a new checkpoint manager.
1378    pub fn full_interval(full_interval: u64) -> Self {
1379        Self {
1380            snapshots: Vec::new(),
1381            full_interval,
1382            last_full_idx: None,
1383            step: 0,
1384        }
1385    }
1386    /// Record a new snapshot.  Decides automatically whether to store a full
1387    /// or incremental (delta) snapshot.
1388    ///
1389    /// `state` is the raw state bytes; `prev_state` is the previous state for
1390    /// delta encoding (ignored for full snapshots).
1391    pub fn record(&mut self, sim_time: f64, state: &[u8], prev_state: &[u8]) {
1392        self.step += 1;
1393        let is_full = self.last_full_idx.is_none() || self.step.is_multiple_of(self.full_interval);
1394        let delta_payload = if is_full {
1395            state.to_vec()
1396        } else {
1397            delta_encode(prev_state, state)
1398        };
1399        let seq = self.step;
1400        let snap = Snapshot {
1401            seq,
1402            sim_time,
1403            delta_payload,
1404            is_full,
1405        };
1406        if is_full {
1407            self.last_full_idx = Some(self.snapshots.len());
1408        }
1409        self.snapshots.push(snap);
1410    }
1411    /// Restore the state at the latest checkpoint by replaying deltas.
1412    ///
1413    /// Returns `None` if no snapshots exist.
1414    pub fn restore_latest(&self) -> Option<Vec<u8>> {
1415        if self.snapshots.is_empty() {
1416            return None;
1417        }
1418        let base_idx = self.last_full_idx?;
1419        let mut state = self.snapshots[base_idx].delta_payload.clone();
1420        for snap in &self.snapshots[base_idx + 1..] {
1421            state = delta_decode(&state, &snap.delta_payload);
1422        }
1423        Some(state)
1424    }
1425    /// Number of stored snapshots.
1426    pub fn snapshot_count(&self) -> usize {
1427        self.snapshots.len()
1428    }
1429}
1430/// Async-style streaming writer with buffered writes and flush interval.
1431#[derive(Debug)]
1432pub struct StreamWriter {
1433    /// Output buffer.
1434    pub buffer: Vec<u8>,
1435    /// Maximum buffer size before auto-flush.
1436    pub buffer_size: usize,
1437    /// Flush interval in seconds.
1438    pub flush_interval: f64,
1439    /// Last flush time.
1440    pub last_flush: f64,
1441    /// Total bytes written.
1442    pub total_written: usize,
1443    /// Flushed data (in-memory output).
1444    pub flushed: Vec<u8>,
1445}
1446impl StreamWriter {
1447    /// Create a new stream writer.
1448    pub fn new(buffer_size: usize, flush_interval: f64) -> Self {
1449        Self {
1450            buffer: Vec::with_capacity(buffer_size),
1451            buffer_size,
1452            flush_interval,
1453            last_flush: 0.0,
1454            total_written: 0,
1455            flushed: Vec::new(),
1456        }
1457    }
1458    /// Write bytes to buffer.
1459    pub fn write(&mut self, data: &[u8]) {
1460        self.buffer.extend_from_slice(data);
1461        self.total_written += data.len();
1462        if self.buffer.len() >= self.buffer_size {
1463            self.flush();
1464        }
1465    }
1466    /// Write a f64 value.
1467    pub fn write_f64(&mut self, v: f64) {
1468        self.write(&v.to_le_bytes());
1469    }
1470    /// Write a u32 value.
1471    pub fn write_u32(&mut self, v: u32) {
1472        self.write(&v.to_le_bytes());
1473    }
1474    /// Flush buffer to flushed storage.
1475    pub fn flush(&mut self) {
1476        self.flushed.extend_from_slice(&self.buffer);
1477        self.buffer.clear();
1478    }
1479    /// Check and flush if interval elapsed.
1480    pub fn maybe_flush(&mut self, current_time: f64) {
1481        if current_time - self.last_flush >= self.flush_interval {
1482            self.flush();
1483            self.last_flush = current_time;
1484        }
1485    }
1486    /// Total flushed bytes.
1487    pub fn flushed_bytes(&self) -> usize {
1488        self.flushed.len()
1489    }
1490}
1491/// In-memory trajectory cache with LRU (least-recently-used) eviction.
1492///
1493/// Stores up to `capacity` frames.  When full, the least-recently-accessed
1494/// frame is evicted to make room for the new entry.
1495#[derive(Debug)]
1496pub struct TrajectoryCache {
1497    /// Cached frames indexed by frame index.
1498    pub frames: HashMap<usize, TrajFrame>,
1499    /// Maximum number of frames to cache.
1500    pub capacity: usize,
1501    /// Global access counter for LRU ordering.
1502    pub(super) access_counter: u64,
1503    /// Total cache misses.
1504    pub misses: usize,
1505    /// Total cache hits.
1506    pub hits: usize,
1507    /// Total evictions.
1508    pub evictions: usize,
1509}
1510impl TrajectoryCache {
1511    /// Create a new trajectory cache.
1512    pub fn new(capacity: usize) -> Self {
1513        Self {
1514            frames: HashMap::new(),
1515            capacity,
1516            access_counter: 0,
1517            misses: 0,
1518            hits: 0,
1519            evictions: 0,
1520        }
1521    }
1522    /// Insert a frame into the cache.  Evicts the LRU frame if at capacity.
1523    pub fn insert(&mut self, idx: usize, time: f64, positions: Vec<f64>) {
1524        if self.frames.contains_key(&idx) {
1525            self.access_counter += 1;
1526            if let Some(f) = self.frames.get_mut(&idx) {
1527                f.last_access = self.access_counter;
1528                f.positions = positions;
1529                f.time = time;
1530            }
1531            return;
1532        }
1533        if self.frames.len() >= self.capacity {
1534            self.evict_lru();
1535        }
1536        self.access_counter += 1;
1537        self.frames.insert(
1538            idx,
1539            TrajFrame {
1540                idx,
1541                time,
1542                positions,
1543                last_access: self.access_counter,
1544            },
1545        );
1546    }
1547    /// Look up a frame. Updates `hits`/`misses` accordingly.
1548    pub fn get(&mut self, idx: usize) -> Option<&TrajFrame> {
1549        if self.frames.contains_key(&idx) {
1550            self.access_counter += 1;
1551            let ac = self.access_counter;
1552            self.frames
1553                .get_mut(&idx)
1554                .expect("key must exist in map")
1555                .last_access = ac;
1556            self.hits += 1;
1557            self.frames.get(&idx)
1558        } else {
1559            self.misses += 1;
1560            None
1561        }
1562    }
1563    /// Evict the least-recently-used frame.
1564    fn evict_lru(&mut self) {
1565        if let Some(&evict_idx) = self
1566            .frames
1567            .iter()
1568            .min_by_key(|(_, f)| f.last_access)
1569            .map(|(k, _)| k)
1570        {
1571            self.frames.remove(&evict_idx);
1572            self.evictions += 1;
1573        }
1574    }
1575    /// Number of frames currently cached.
1576    pub fn len(&self) -> usize {
1577        self.frames.len()
1578    }
1579    /// Is the cache empty?
1580    pub fn is_empty(&self) -> bool {
1581        self.frames.is_empty()
1582    }
1583    /// Cache hit ratio.
1584    pub fn hit_ratio(&self) -> f64 {
1585        let total = self.hits + self.misses;
1586        if total == 0 {
1587            return 0.0;
1588        }
1589        self.hits as f64 / total as f64
1590    }
1591}
1592/// Async-style streaming reader with frame callbacks.
1593#[derive(Debug)]
1594pub struct StreamReader {
1595    /// Input data.
1596    pub data: Vec<u8>,
1597    /// Read position.
1598    pub pos: usize,
1599    /// Frame size in bytes (fixed-size framing).
1600    pub frame_size: usize,
1601    /// Total frames read.
1602    pub frames_read: usize,
1603}
1604impl StreamReader {
1605    /// Create a new stream reader.
1606    pub fn new(data: Vec<u8>, frame_size: usize) -> Self {
1607        Self {
1608            data,
1609            pos: 0,
1610            frame_size,
1611            frames_read: 0,
1612        }
1613    }
1614    /// Read next frame. Returns None if insufficient data.
1615    pub fn read_frame(&mut self) -> Option<Vec<u8>> {
1616        if self.pos + self.frame_size > self.data.len() {
1617            return None;
1618        }
1619        let frame = self.data[self.pos..self.pos + self.frame_size].to_vec();
1620        self.pos += self.frame_size;
1621        self.frames_read += 1;
1622        Some(frame)
1623    }
1624    /// Read all frames with a callback.
1625    pub fn read_all<F: FnMut(&[u8])>(&mut self, mut callback: F) {
1626        while let Some(frame) = self.read_frame() {
1627            callback(&frame);
1628        }
1629    }
1630    /// Seek to byte offset.
1631    pub fn seek(&mut self, pos: usize) {
1632        self.pos = pos.min(self.data.len());
1633    }
1634    /// Remaining bytes.
1635    pub fn remaining(&self) -> usize {
1636        self.data.len().saturating_sub(self.pos)
1637    }
1638    /// Append more data (for streaming use).
1639    pub fn append_data(&mut self, more: &[u8]) {
1640        self.data.extend_from_slice(more);
1641    }
1642}