1use std::collections::{HashMap, VecDeque};
6
7#[allow(unused_imports)]
8use super::functions::*;
9use super::functions::{chunk_from_bytes, chunk_to_bytes, delta_decode, delta_encode, fnv1a_hash};
10
11#[derive(Debug, Clone, Default)]
17pub struct RleEncoder;
18impl RleEncoder {
19 pub fn encode(data: &[u8]) -> Vec<u8> {
21 if data.is_empty() {
22 return Vec::new();
23 }
24 let mut out = Vec::new();
25 let mut i = 0;
26 while i < data.len() {
27 let val = data[i];
28 let mut run = 1usize;
29 while i + run < data.len() && data[i + run] == val && run < 254 {
30 run += 1;
31 }
32 if run >= 3 {
33 out.push(0xFF);
34 out.push(run as u8);
35 out.push(val);
36 } else {
37 for j in 0..run {
38 out.push(data[i + j]);
39 }
40 }
41 i += run;
42 }
43 out
44 }
45 pub fn decode(data: &[u8]) -> Vec<u8> {
47 let mut out = Vec::new();
48 let mut i = 0;
49 while i < data.len() {
50 if data[i] == 0xFF && i + 2 < data.len() {
51 let count = data[i + 1] as usize;
52 let val = data[i + 2];
53 for _ in 0..count {
54 out.push(val);
55 }
56 i += 3;
57 } else {
58 out.push(data[i]);
59 i += 1;
60 }
61 }
62 out
63 }
64}
65#[derive(Debug, Clone)]
67pub struct TrajFrame {
68 pub idx: usize,
70 pub time: f64,
72 pub positions: Vec<f64>,
74 pub last_access: u64,
76}
77#[derive(Debug, Clone)]
79pub struct WsFrame {
80 pub frame_type: WsFrameType,
82 pub payload: Vec<u8>,
84 pub fin: bool,
86}
87#[derive(Debug)]
91pub struct ParallelIoCoordinator {
92 pub writers: Vec<WriterMeta>,
94 pub output: Vec<u8>,
96 pub total_size: usize,
98}
99impl ParallelIoCoordinator {
100 pub fn new(total_size: usize) -> Self {
102 Self {
103 writers: Vec::new(),
104 output: vec![0u8; total_size],
105 total_size,
106 }
107 }
108 pub fn register_writer(&mut self, start: usize, end: usize) -> usize {
112 let id = self.writers.len();
113 self.writers.push(WriterMeta {
114 id,
115 byte_range: (start, end),
116 bytes_written: 0,
117 done: false,
118 });
119 id
120 }
121 pub fn submit(&mut self, id: usize, data: &[u8]) {
125 if id >= self.writers.len() {
126 return;
127 }
128 let (start, end) = self.writers[id].byte_range;
129 let len = data.len().min(end.saturating_sub(start));
130 if start + len <= self.output.len() {
131 self.output[start..start + len].copy_from_slice(&data[..len]);
132 }
133 self.writers[id].bytes_written = len;
134 self.writers[id].done = true;
135 }
136 pub fn all_done(&self) -> bool {
138 self.writers.iter().all(|w| w.done)
139 }
140 pub fn total_written(&self) -> usize {
142 self.writers.iter().map(|w| w.bytes_written).sum()
143 }
144}
145#[derive(Debug, Clone)]
147pub struct TrajectorySampler {
148 pub skip_frames: usize,
150 pub current: usize,
152}
153impl TrajectorySampler {
154 pub fn new(skip_frames: usize) -> Self {
156 Self {
157 skip_frames,
158 current: 0,
159 }
160 }
161}
162#[derive(Debug, Clone)]
164pub struct WriterMeta {
165 pub id: usize,
167 pub byte_range: (usize, usize),
169 pub bytes_written: usize,
171 pub done: bool,
173}
174#[derive(Debug, Clone)]
176pub struct ProtocolFrame {
177 pub timestamp: f64,
179 pub seq: u64,
181 pub payload: Vec<u8>,
183}
184#[derive(Debug, Clone, PartialEq)]
186pub enum IoStatus {
187 Ready,
189 Pending,
191 Error(String),
193}
194#[derive(Debug, Clone)]
196pub struct FrameInterpolator {
197 pub prev: Vec<f64>,
199 pub next: Vec<f64>,
201 pub t_prev: f64,
203 pub t_next: f64,
205 pub method: InterpolationMethod,
207}
208impl FrameInterpolator {
209 pub fn new(method: InterpolationMethod) -> Self {
211 Self {
212 prev: Vec::new(),
213 next: Vec::new(),
214 t_prev: 0.0,
215 t_next: 1.0,
216 method,
217 }
218 }
219 pub fn update(&mut self, prev: Vec<f64>, t_prev: f64, next: Vec<f64>, t_next: f64) {
221 self.prev = prev;
222 self.t_prev = t_prev;
223 self.next = next;
224 self.t_next = t_next;
225 }
226 pub fn interpolate(&self, t: f64) -> Vec<f64> {
228 let dt = self.t_next - self.t_prev;
229 if dt.abs() < 1e-30 {
230 return self.next.clone();
231 }
232 let alpha = ((t - self.t_prev) / dt).clamp(0.0, 1.0);
233 match self.method {
234 InterpolationMethod::Linear => self
235 .prev
236 .iter()
237 .zip(self.next.iter())
238 .map(|(&p, &n)| p + alpha * (n - p))
239 .collect(),
240 InterpolationMethod::CubicHermite => {
241 let h00 = 2.0 * alpha * alpha * alpha - 3.0 * alpha * alpha + 1.0;
242 let h01 = -2.0 * alpha * alpha * alpha + 3.0 * alpha * alpha;
243 self.prev
244 .iter()
245 .zip(self.next.iter())
246 .map(|(&p, &n)| h00 * p + h01 * n)
247 .collect()
248 }
249 InterpolationMethod::Nearest => {
250 if alpha < 0.5 {
251 self.prev.clone()
252 } else {
253 self.next.clone()
254 }
255 }
256 }
257 }
258 pub fn interpolation_error(&self, gt: &[f64], t: f64) -> f64 {
260 let interp = self.interpolate(t);
261 if gt.len() != interp.len() {
262 return f64::INFINITY;
263 }
264 let sum: f64 = interp
265 .iter()
266 .zip(gt.iter())
267 .map(|(a, b)| (a - b).powi(2))
268 .sum();
269 (sum / interp.len().max(1) as f64).sqrt()
270 }
271}
272#[derive(Debug, Clone)]
274pub struct Packet {
275 pub dest: String,
277 pub data: Vec<u8>,
279 pub seq: u64,
281 pub latency_ms: f64,
283}
284#[derive(Debug)]
288pub struct PhysicsStreamServer {
289 pub port: u16,
291 pub n_clients: usize,
293 pub frame_queue: VecDeque<ProtocolFrame>,
295 pub protocol: BinaryProtocol,
297 pub(super) seq: u64,
299 pub running: bool,
301}
302impl PhysicsStreamServer {
303 pub fn new(port: u16) -> Self {
305 Self {
306 port,
307 n_clients: 0,
308 frame_queue: VecDeque::new(),
309 protocol: BinaryProtocol::oxiphysics(),
310 seq: 0,
311 running: false,
312 }
313 }
314 pub fn start(&mut self) {
316 self.running = true;
317 }
318 pub fn stop(&mut self) {
320 self.running = false;
321 }
322 pub fn connect_client(&mut self) {
324 self.n_clients += 1;
325 }
326 pub fn disconnect_client(&mut self) {
328 if self.n_clients > 0 {
329 self.n_clients -= 1;
330 }
331 }
332 pub fn broadcast(&mut self, timestamp: f64, state: Vec<u8>) {
334 self.seq += 1;
335 let frame = ProtocolFrame {
336 timestamp,
337 seq: self.seq,
338 payload: state,
339 };
340 self.frame_queue.push_back(frame);
341 }
342 pub fn pop_frame(&mut self) -> Option<ProtocolFrame> {
344 self.frame_queue.pop_front()
345 }
346 pub fn queue_depth(&self) -> usize {
348 self.frame_queue.len()
349 }
350}
351#[derive(Debug, Clone)]
356pub struct BinaryProtocol {
357 pub magic: [u8; 4],
359 pub version: u8,
361}
362impl BinaryProtocol {
363 pub fn new(magic: [u8; 4], version: u8) -> Self {
365 Self { magic, version }
366 }
367 pub fn oxiphysics() -> Self {
369 Self::new(*b"OXIP", 1)
370 }
371 pub fn encode(&self, frame: &ProtocolFrame) -> Vec<u8> {
373 let payload_len = frame.payload.len() as u32;
374 let mut buf = Vec::new();
375 buf.extend_from_slice(&self.magic);
376 buf.push(self.version);
377 buf.extend_from_slice(&frame.timestamp.to_le_bytes());
378 buf.extend_from_slice(&frame.seq.to_le_bytes());
379 buf.extend_from_slice(&payload_len.to_le_bytes());
380 buf.extend_from_slice(&frame.payload);
381 buf
382 }
383 pub fn decode(&self, data: &[u8]) -> Option<ProtocolFrame> {
385 if data.len() < 25 {
386 return None;
387 }
388 if data[0..4] != self.magic {
389 return None;
390 }
391 if data[4] != self.version {
392 return None;
393 }
394 let timestamp = f64::from_le_bytes(data[5..13].try_into().ok()?);
395 let seq = u64::from_le_bytes(data[13..21].try_into().ok()?);
396 let payload_len = u32::from_le_bytes(data[21..25].try_into().ok()?) as usize;
397 if data.len() < 25 + payload_len {
398 return None;
399 }
400 let payload = data[25..25 + payload_len].to_vec();
401 Some(ProtocolFrame {
402 timestamp,
403 seq,
404 payload,
405 })
406 }
407 pub fn header_size(&self) -> usize {
409 25
410 }
411}
412#[derive(Debug)]
414pub struct NetworkTransport {
415 pub latency_ms: f64,
417 pub packet_loss: f64,
419 pub send_queue: VecDeque<Packet>,
421 pub recv_buffer: VecDeque<Packet>,
423 pub packets_sent: u64,
425 pub packets_dropped: u64,
427 pub(super) seq: u64,
429}
430impl NetworkTransport {
431 pub fn new(latency_ms: f64, packet_loss: f64) -> Self {
433 Self {
434 latency_ms,
435 packet_loss,
436 send_queue: VecDeque::new(),
437 recv_buffer: VecDeque::new(),
438 packets_sent: 0,
439 packets_dropped: 0,
440 seq: 0,
441 }
442 }
443 pub fn send_packet(&mut self, dest: &str, data: Vec<u8>) {
445 self.seq += 1;
446 let packet = Packet {
447 dest: dest.to_string(),
448 data,
449 seq: self.seq,
450 latency_ms: self.latency_ms,
451 };
452 if self.seq % 100 < (self.packet_loss * 100.0) as u64 {
453 self.packets_dropped += 1;
454 return;
455 }
456 self.send_queue.push_back(packet);
457 self.packets_sent += 1;
458 }
459 pub fn deliver(&mut self) {
461 while let Some(pkt) = self.send_queue.pop_front() {
462 self.recv_buffer.push_back(pkt);
463 }
464 }
465 pub fn receive_packet(&mut self) -> Option<Packet> {
467 self.recv_buffer.pop_front()
468 }
469 pub fn queue_depth(&self) -> usize {
471 self.send_queue.len() + self.recv_buffer.len()
472 }
473}
474#[derive(Debug)]
478pub struct PhysicsStreamClient {
479 pub port: u16,
481 pub connected: bool,
483 pub frames: VecDeque<ProtocolFrame>,
485 pub protocol: BinaryProtocol,
487 pub latest_state: Vec<f64>,
489 pub prev_state: Vec<f64>,
491}
492impl PhysicsStreamClient {
493 pub fn new(port: u16) -> Self {
495 Self {
496 port,
497 connected: false,
498 frames: VecDeque::new(),
499 protocol: BinaryProtocol::oxiphysics(),
500 latest_state: Vec::new(),
501 prev_state: Vec::new(),
502 }
503 }
504 pub fn connect(&mut self) {
506 self.connected = true;
507 }
508 pub fn disconnect(&mut self) {
510 self.connected = false;
511 }
512 pub fn receive_frame(&mut self, frame: ProtocolFrame) {
514 self.frames.push_back(frame);
515 }
516 pub fn decode_latest(&mut self) {
518 if let Some(frame) = self.frames.back() {
519 self.prev_state = self.latest_state.clone();
520 let n = frame.payload.len() / 8;
521 self.latest_state = (0..n)
522 .map(|i| {
523 f64::from_le_bytes(
524 frame.payload[i * 8..(i + 1) * 8]
525 .try_into()
526 .unwrap_or([0u8; 8]),
527 )
528 })
529 .collect();
530 }
531 }
532 pub fn interpolated_state(&self, t: f64) -> Vec<f64> {
534 if self.prev_state.len() != self.latest_state.len() {
535 return self.latest_state.clone();
536 }
537 self.prev_state
538 .iter()
539 .zip(self.latest_state.iter())
540 .map(|(&p, &l)| p + t * (l - p))
541 .collect()
542 }
543 pub fn buffered_frames(&self) -> usize {
545 self.frames.len()
546 }
547}
548#[derive(Debug)]
554pub struct ChunkedFileBuffer {
555 pub storage: Vec<u8>,
557 pub chunk_size: usize,
559 pub read_pos: usize,
561 pub write_pos: usize,
563 pub bytes_written: usize,
565}
566impl ChunkedFileBuffer {
567 pub fn new(chunk_size: usize) -> Self {
569 Self {
570 storage: Vec::new(),
571 chunk_size,
572 read_pos: 0,
573 write_pos: 0,
574 bytes_written: 0,
575 }
576 }
577 pub fn write_chunk(&mut self, data: &[u8]) {
579 self.storage.extend_from_slice(data);
580 self.write_pos += data.len();
581 self.bytes_written += data.len();
582 }
583 pub fn read_chunk(&mut self) -> Option<Vec<u8>> {
585 if self.read_pos >= self.storage.len() {
586 return None;
587 }
588 let end = (self.read_pos + self.chunk_size).min(self.storage.len());
589 let chunk = self.storage[self.read_pos..end].to_vec();
590 self.read_pos = end;
591 Some(chunk)
592 }
593 pub fn available_chunks(&self) -> usize {
595 let remaining = self.storage.len().saturating_sub(self.read_pos);
596 (remaining + self.chunk_size - 1) / self.chunk_size.max(1)
597 }
598 pub fn reset_read(&mut self) {
600 self.read_pos = 0;
601 }
602 pub fn stored_bytes(&self) -> usize {
604 self.storage.len()
605 }
606}
607#[derive(Debug)]
609pub struct ChunkedReader {
610 pub path: String,
612 pub total_frames: usize,
614 pub(super) offsets: Vec<u64>,
616}
617impl ChunkedReader {
618 pub fn open(path: &str) -> Result<Self, String> {
620 let data = std::fs::read(path).map_err(|e| e.to_string())?;
621 let mut offsets = Vec::new();
622 let mut pos = 0usize;
623 while pos + 8 <= data.len() {
624 let frame_len = u64::from_le_bytes(
625 data[pos..pos + 8]
626 .try_into()
627 .map_err(|_| "bad length field")?,
628 ) as usize;
629 offsets.push(pos as u64);
630 pos += 8 + frame_len;
631 }
632 let total = offsets.len();
633 Ok(Self {
634 path: path.into(),
635 total_frames: total,
636 offsets,
637 })
638 }
639 pub fn total_frames(&self) -> usize {
641 self.total_frames
642 }
643 pub fn read_chunk(&self, frame_idx: usize) -> Result<SimulationChunk, String> {
645 if frame_idx >= self.offsets.len() {
646 return Err(format!(
647 "frame_idx {frame_idx} out of range ({})",
648 self.offsets.len()
649 ));
650 }
651 use std::io::{Read, Seek, SeekFrom};
652 let mut file = std::fs::File::open(&self.path).map_err(|e| e.to_string())?;
653 file.seek(SeekFrom::Start(self.offsets[frame_idx]))
654 .map_err(|e| e.to_string())?;
655 let mut len_buf = [0u8; 8];
656 file.read_exact(&mut len_buf).map_err(|e| e.to_string())?;
657 let frame_len = u64::from_le_bytes(len_buf) as usize;
658 let mut frame_data = vec![0u8; frame_len];
659 file.read_exact(&mut frame_data)
660 .map_err(|e| e.to_string())?;
661 chunk_from_bytes(&frame_data).ok_or_else(|| "corrupt frame data".into())
662 }
663}
664#[derive(Debug)]
669pub struct AsyncIoStub {
670 pub buffer: Vec<u8>,
672 pub pos: usize,
674 pub pending_ticks: usize,
677 pub ops_completed: usize,
679}
680impl AsyncIoStub {
681 pub fn new(data: Vec<u8>, pending_ticks: usize) -> Self {
683 Self {
684 buffer: data,
685 pos: 0,
686 pending_ticks,
687 ops_completed: 0,
688 }
689 }
690 pub fn poll_read(&mut self, len: usize) -> (IoStatus, Vec<u8>) {
694 if self.pending_ticks > 0 {
695 self.pending_ticks -= 1;
696 return (IoStatus::Pending, Vec::new());
697 }
698 let end = (self.pos + len).min(self.buffer.len());
699 let data = self.buffer[self.pos..end].to_vec();
700 self.pos = end;
701 self.ops_completed += 1;
702 (IoStatus::Ready, data)
703 }
704 pub fn poll_write(&mut self, data: &[u8]) -> IoStatus {
708 if self.pending_ticks > 0 {
709 self.pending_ticks -= 1;
710 return IoStatus::Pending;
711 }
712 self.buffer.extend_from_slice(data);
713 self.ops_completed += 1;
714 IoStatus::Ready
715 }
716 pub fn blocking_read_all(&mut self, chunk_size: usize) -> Vec<u8> {
718 let mut out = Vec::new();
719 loop {
720 let (status, chunk) = self.poll_read(chunk_size);
721 if status == IoStatus::Pending {
722 continue;
723 }
724 if chunk.is_empty() {
725 break;
726 }
727 out.extend_from_slice(&chunk);
728 }
729 out
730 }
731}
732#[derive(Debug)]
737pub struct CsvStreamParser {
738 pub delimiter: char,
740 pub headers: Vec<String>,
742 pub rows: Vec<Vec<String>>,
744 pub has_header: bool,
746 pub error_count: usize,
748}
749impl CsvStreamParser {
750 pub fn new(delimiter: char, has_header: bool) -> Self {
752 Self {
753 delimiter,
754 headers: Vec::new(),
755 rows: Vec::new(),
756 has_header,
757 error_count: 0,
758 }
759 }
760 pub fn feed_line(&mut self, line: &str) {
762 let line = line.trim();
763 if line.is_empty() {
764 return;
765 }
766 let fields: Vec<String> = line
767 .split(self.delimiter)
768 .map(|s| s.trim().to_string())
769 .collect();
770 if self.has_header && self.headers.is_empty() {
771 self.headers = fields;
772 } else {
773 self.rows.push(fields);
774 }
775 }
776 pub fn feed_text(&mut self, text: &str) {
778 for line in text.lines() {
779 self.feed_line(line);
780 }
781 }
782 pub fn column_f64(&self, name: &str) -> Vec<f64> {
784 let col_idx = self.headers.iter().position(|h| h == name);
785 match col_idx {
786 None => Vec::new(),
787 Some(idx) => self
788 .rows
789 .iter()
790 .filter_map(|row| row.get(idx).and_then(|v| v.parse::<f64>().ok()))
791 .collect(),
792 }
793 }
794 pub fn column_f64_by_idx(&self, idx: usize) -> Vec<f64> {
796 self.rows
797 .iter()
798 .filter_map(|row| row.get(idx).and_then(|v| v.parse::<f64>().ok()))
799 .collect()
800 }
801 pub fn row_count(&self) -> usize {
803 self.rows.len()
804 }
805}
806#[derive(Debug, Clone, PartialEq)]
808pub enum WsFrameType {
809 Text,
811 Binary,
813 Ping,
815 Pong,
817 Close,
819}
820#[derive(Debug)]
822pub struct WebSocketBridge {
823 pub outgoing: VecDeque<WsFrame>,
825 pub incoming: VecDeque<WsFrame>,
827 pub connected: bool,
829 pub max_frame_size: usize,
831}
832impl WebSocketBridge {
833 pub fn new(max_frame_size: usize) -> Self {
835 Self {
836 outgoing: VecDeque::new(),
837 incoming: VecDeque::new(),
838 connected: false,
839 max_frame_size,
840 }
841 }
842 pub fn connect(&mut self) {
844 self.connected = true;
845 }
846 pub fn disconnect(&mut self) {
848 self.connected = false;
849 let close = WsFrame {
850 frame_type: WsFrameType::Close,
851 payload: Vec::new(),
852 fin: true,
853 };
854 self.outgoing.push_back(close);
855 }
856 pub fn send_text(&mut self, text: &str) {
858 let frame = WsFrame {
859 frame_type: WsFrameType::Text,
860 payload: text.as_bytes().to_vec(),
861 fin: true,
862 };
863 self.outgoing.push_back(frame);
864 }
865 pub fn send_binary(&mut self, data: Vec<u8>) {
867 let frame = WsFrame {
868 frame_type: WsFrameType::Binary,
869 payload: data,
870 fin: true,
871 };
872 self.outgoing.push_back(frame);
873 }
874 pub fn ping(&mut self, data: Vec<u8>) {
876 let frame = WsFrame {
877 frame_type: WsFrameType::Ping,
878 payload: data,
879 fin: true,
880 };
881 self.outgoing.push_back(frame);
882 }
883 pub fn recv_frame(&mut self) -> Option<WsFrame> {
885 self.incoming.pop_front()
886 }
887 pub fn inject_frame(&mut self, frame: WsFrame) {
889 if frame.frame_type == WsFrameType::Ping {
890 let pong = WsFrame {
891 frame_type: WsFrameType::Pong,
892 payload: frame.payload.clone(),
893 fin: true,
894 };
895 self.outgoing.push_back(pong);
896 }
897 self.incoming.push_back(frame);
898 }
899 pub fn pending_out(&self) -> usize {
901 self.outgoing.len()
902 }
903}
904#[derive(Debug)]
909pub struct DataDeduplicator {
910 pub store: HashMap<u64, Vec<u8>>,
912 pub bytes_received: usize,
914 pub bytes_stored: usize,
916 pub hits: usize,
918}
919impl DataDeduplicator {
920 pub fn new() -> Self {
922 Self {
923 store: HashMap::new(),
924 bytes_received: 0,
925 bytes_stored: 0,
926 hits: 0,
927 }
928 }
929 pub fn ingest(&mut self, data: &[u8]) -> u64 {
934 self.bytes_received += data.len();
935 let hash = fnv1a_hash(data);
936 if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(hash) {
937 e.insert(data.to_vec());
938 self.bytes_stored += data.len();
939 } else {
940 self.hits += 1;
941 }
942 hash
943 }
944 pub fn retrieve(&self, hash: u64) -> Option<&[u8]> {
946 self.store.get(&hash).map(|v| v.as_slice())
947 }
948 pub fn dedup_ratio(&self) -> f64 {
950 if self.bytes_received == 0 {
951 return 0.0;
952 }
953 1.0 - self.bytes_stored as f64 / self.bytes_received as f64
954 }
955 pub fn unique_blocks(&self) -> usize {
957 self.store.len()
958 }
959}
960#[derive(Debug)]
965pub struct VtkStreamWriter {
966 pub output: Vec<u8>,
968 pub dims: (usize, usize, usize),
970 pub spacing: (f64, f64, f64),
972 pub origin: (f64, f64, f64),
974 pub dataset_name: String,
976 pub header_written: bool,
978 pub fields_written: usize,
980}
981impl VtkStreamWriter {
982 pub fn new(
984 dims: (usize, usize, usize),
985 spacing: (f64, f64, f64),
986 origin: (f64, f64, f64),
987 dataset_name: &str,
988 ) -> Self {
989 Self {
990 output: Vec::new(),
991 dims,
992 spacing,
993 origin,
994 dataset_name: dataset_name.to_string(),
995 header_written: false,
996 fields_written: 0,
997 }
998 }
999 pub fn write_header(&mut self) {
1001 let header = format!(
1002 "# vtk DataFile Version 3.0\n{}\nASCII\nDATASET STRUCTURED_POINTS\n\
1003 DIMENSIONS {} {} {}\nSPACING {} {} {}\nORIGIN {} {} {}\nPOINT_DATA {}\n",
1004 self.dataset_name,
1005 self.dims.0,
1006 self.dims.1,
1007 self.dims.2,
1008 self.spacing.0,
1009 self.spacing.1,
1010 self.spacing.2,
1011 self.origin.0,
1012 self.origin.1,
1013 self.origin.2,
1014 self.dims.0 * self.dims.1 * self.dims.2,
1015 );
1016 self.output.extend_from_slice(header.as_bytes());
1017 self.header_written = true;
1018 }
1019 pub fn write_scalar_field(&mut self, field_name: &str, data: &[f64]) {
1023 if !self.header_written {
1024 self.write_header();
1025 }
1026 let mut section = format!("SCALARS {} float 1\nLOOKUP_TABLE default\n", field_name);
1027 for &v in data {
1028 section.push_str(&format!("{:.6e} ", v));
1029 }
1030 section.push('\n');
1031 self.output.extend_from_slice(section.as_bytes());
1032 self.fields_written += 1;
1033 }
1034 pub fn as_str(&self) -> &str {
1036 std::str::from_utf8(&self.output).unwrap_or("")
1037 }
1038 pub fn bytes_written(&self) -> usize {
1040 self.output.len()
1041 }
1042}
1043#[derive(Debug)]
1045pub struct StreamingXYZWriter {
1046 pub path: String,
1048 pub frames_written: usize,
1050}
1051#[derive(Debug, Clone, Copy, PartialEq)]
1053pub enum InterpolationMethod {
1054 Linear,
1056 CubicHermite,
1058 Nearest,
1060}
1061#[derive(Debug)]
1065pub struct RingBuffer<T: Clone> {
1066 pub(super) data: VecDeque<T>,
1068 pub(super) capacity: usize,
1070}
1071impl<T: Clone> RingBuffer<T> {
1072 pub fn new(capacity: usize) -> Self {
1074 Self {
1075 data: VecDeque::with_capacity(capacity),
1076 capacity,
1077 }
1078 }
1079 pub fn push(&mut self, item: T) -> bool {
1081 if self.data.len() >= self.capacity {
1082 return false;
1083 }
1084 self.data.push_back(item);
1085 true
1086 }
1087 pub fn pop(&mut self) -> Option<T> {
1089 self.data.pop_front()
1090 }
1091 pub fn len(&self) -> usize {
1093 self.data.len()
1094 }
1095 pub fn is_empty(&self) -> bool {
1097 self.data.is_empty()
1098 }
1099 pub fn is_full(&self) -> bool {
1101 self.data.len() >= self.capacity
1102 }
1103 pub fn capacity(&self) -> usize {
1105 self.capacity
1106 }
1107 pub fn peek(&self) -> Option<&T> {
1109 self.data.front()
1110 }
1111}
1112#[derive(Debug)]
1117pub struct BackpressureBuffer<T: Clone> {
1118 pub(super) inner: RingBuffer<T>,
1120 pub high_watermark: f64,
1122 pub low_watermark: f64,
1124 pub backpressure: bool,
1126 pub dropped: usize,
1128}
1129impl<T: Clone> BackpressureBuffer<T> {
1130 pub fn new(capacity: usize, high_watermark: f64, low_watermark: f64) -> Self {
1132 Self {
1133 inner: RingBuffer::new(capacity),
1134 high_watermark,
1135 low_watermark,
1136 backpressure: false,
1137 dropped: 0,
1138 }
1139 }
1140 pub fn push(&mut self, item: T) -> bool {
1143 let fill = self.inner.len() as f64 / self.inner.capacity() as f64;
1144 if fill >= self.high_watermark {
1145 self.backpressure = true;
1146 }
1147 if self.backpressure {
1148 self.dropped += 1;
1149 return false;
1150 }
1151 self.inner.push(item)
1152 }
1153 pub fn pop(&mut self) -> Option<T> {
1155 let item = self.inner.pop();
1156 let fill = self.inner.len() as f64 / self.inner.capacity().max(1) as f64;
1157 if fill <= self.low_watermark {
1158 self.backpressure = false;
1159 }
1160 item
1161 }
1162 pub fn fill_fraction(&self) -> f64 {
1164 self.inner.len() as f64 / self.inner.capacity().max(1) as f64
1165 }
1166 pub fn is_backpressure(&self) -> bool {
1168 self.backpressure
1169 }
1170 pub fn len(&self) -> usize {
1172 self.inner.len()
1173 }
1174 pub fn is_empty(&self) -> bool {
1176 self.inner.is_empty()
1177 }
1178}
1179#[derive(Debug)]
1184pub struct ChunkedWriter {
1185 pub path: String,
1187 pub frame_count: usize,
1189 pub chunk_size: usize,
1191}
1192impl ChunkedWriter {
1193 pub fn new(path: impl Into<String>, chunk_size: usize) -> Self {
1195 Self {
1196 path: path.into(),
1197 frame_count: 0,
1198 chunk_size,
1199 }
1200 }
1201 pub fn write_chunk(&mut self, chunk: &SimulationChunk) -> Result<(), String> {
1205 use std::io::Write as IoWrite;
1206 let file = std::fs::OpenOptions::new()
1207 .create(true)
1208 .append(true)
1209 .open(&self.path)
1210 .map_err(|e| e.to_string())?;
1211 let mut writer = std::io::BufWriter::new(file);
1212 let data = chunk_to_bytes(chunk);
1213 let len = (data.len() as u64).to_le_bytes();
1214 writer.write_all(&len).map_err(|e| e.to_string())?;
1215 writer.write_all(&data).map_err(|e| e.to_string())?;
1216 self.frame_count += 1;
1217 Ok(())
1218 }
1219 pub fn frame_count(&self) -> usize {
1221 self.frame_count
1222 }
1223 pub fn finalize(&mut self) -> Result<(), String> {
1225 Ok(())
1226 }
1227}
1228#[derive(Debug, Clone, Default)]
1232pub struct CompressionStream;
1233impl CompressionStream {
1234 pub fn compress_f32(data: &[f32]) -> Vec<u8> {
1236 if data.is_empty() {
1237 return Vec::new();
1238 }
1239 let mut out = Vec::new();
1240 let mut i = 0;
1241 while i < data.len() {
1242 let val = data[i];
1243 let mut run = 1usize;
1244 while i + run < data.len() && data[i + run] == val && run < 255 {
1245 run += 1;
1246 }
1247 if run > 2 {
1248 out.push(0xFF);
1249 out.push(run as u8);
1250 out.extend_from_slice(&val.to_le_bytes());
1251 } else {
1252 for j in 0..run {
1253 out.extend_from_slice(&data[i + j].to_le_bytes());
1254 }
1255 }
1256 i += run;
1257 }
1258 out
1259 }
1260 pub fn decompress_f32(data: &[u8]) -> Vec<f32> {
1262 let mut out = Vec::new();
1263 let mut i = 0;
1264 while i < data.len() {
1265 if i + 4 <= data.len() && data[i] == 0xFF && i + 1 < data.len() {
1266 let count = data[i + 1] as usize;
1267 if i + 6 <= data.len() {
1268 let val =
1269 f32::from_le_bytes([data[i + 2], data[i + 3], data[i + 4], data[i + 5]]);
1270 for _ in 0..count {
1271 out.push(val);
1272 }
1273 i += 6;
1274 continue;
1275 }
1276 }
1277 if i + 4 <= data.len() {
1278 let val = f32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]);
1279 out.push(val);
1280 i += 4;
1281 } else {
1282 break;
1283 }
1284 }
1285 out
1286 }
1287 pub fn compression_ratio(original: &[f32], compressed: &[u8]) -> f64 {
1289 let orig_bytes = original.len() * 4;
1290 if orig_bytes == 0 {
1291 return 1.0;
1292 }
1293 compressed.len() as f64 / orig_bytes as f64
1294 }
1295}
1296#[derive(Debug)]
1300pub struct StreamingXYZReader {
1301 pub path: String,
1303 pub current_frame: usize,
1305 pub n_atoms: usize,
1307 pub buffer: Vec<u8>,
1309}
1310#[derive(Debug, Clone)]
1312pub struct SimulationChunk {
1313 pub frame_id: usize,
1315 pub particle_count: usize,
1317 pub positions: Vec<[f32; 3]>,
1319 pub velocities: Vec<[f32; 3]>,
1321 pub time: f64,
1323}
1324impl SimulationChunk {
1325 pub fn new(frame_id: usize, time: f64) -> Self {
1327 Self {
1328 frame_id,
1329 particle_count: 0,
1330 positions: Vec::new(),
1331 velocities: Vec::new(),
1332 time,
1333 }
1334 }
1335 pub fn add_particle(&mut self, pos: [f32; 3], vel: [f32; 3]) {
1337 self.positions.push(pos);
1338 self.velocities.push(vel);
1339 self.particle_count += 1;
1340 }
1341 pub fn particle_count(&self) -> usize {
1343 self.particle_count
1344 }
1345 pub fn byte_size(&self) -> usize {
1347 self.particle_count * 2 * 3 * 4 + 24
1348 }
1349}
1350#[derive(Debug, Clone)]
1352pub struct Snapshot {
1353 pub seq: u64,
1355 pub sim_time: f64,
1357 pub delta_payload: Vec<u8>,
1359 pub is_full: bool,
1361}
1362#[derive(Debug)]
1366pub struct CheckpointManager {
1367 pub snapshots: Vec<Snapshot>,
1369 pub full_interval: u64,
1371 pub last_full_idx: Option<usize>,
1373 pub step: u64,
1375}
1376impl CheckpointManager {
1377 pub fn full_interval(full_interval: u64) -> Self {
1379 Self {
1380 snapshots: Vec::new(),
1381 full_interval,
1382 last_full_idx: None,
1383 step: 0,
1384 }
1385 }
1386 pub fn record(&mut self, sim_time: f64, state: &[u8], prev_state: &[u8]) {
1392 self.step += 1;
1393 let is_full = self.last_full_idx.is_none() || self.step.is_multiple_of(self.full_interval);
1394 let delta_payload = if is_full {
1395 state.to_vec()
1396 } else {
1397 delta_encode(prev_state, state)
1398 };
1399 let seq = self.step;
1400 let snap = Snapshot {
1401 seq,
1402 sim_time,
1403 delta_payload,
1404 is_full,
1405 };
1406 if is_full {
1407 self.last_full_idx = Some(self.snapshots.len());
1408 }
1409 self.snapshots.push(snap);
1410 }
1411 pub fn restore_latest(&self) -> Option<Vec<u8>> {
1415 if self.snapshots.is_empty() {
1416 return None;
1417 }
1418 let base_idx = self.last_full_idx?;
1419 let mut state = self.snapshots[base_idx].delta_payload.clone();
1420 for snap in &self.snapshots[base_idx + 1..] {
1421 state = delta_decode(&state, &snap.delta_payload);
1422 }
1423 Some(state)
1424 }
1425 pub fn snapshot_count(&self) -> usize {
1427 self.snapshots.len()
1428 }
1429}
1430#[derive(Debug)]
1432pub struct StreamWriter {
1433 pub buffer: Vec<u8>,
1435 pub buffer_size: usize,
1437 pub flush_interval: f64,
1439 pub last_flush: f64,
1441 pub total_written: usize,
1443 pub flushed: Vec<u8>,
1445}
1446impl StreamWriter {
1447 pub fn new(buffer_size: usize, flush_interval: f64) -> Self {
1449 Self {
1450 buffer: Vec::with_capacity(buffer_size),
1451 buffer_size,
1452 flush_interval,
1453 last_flush: 0.0,
1454 total_written: 0,
1455 flushed: Vec::new(),
1456 }
1457 }
1458 pub fn write(&mut self, data: &[u8]) {
1460 self.buffer.extend_from_slice(data);
1461 self.total_written += data.len();
1462 if self.buffer.len() >= self.buffer_size {
1463 self.flush();
1464 }
1465 }
1466 pub fn write_f64(&mut self, v: f64) {
1468 self.write(&v.to_le_bytes());
1469 }
1470 pub fn write_u32(&mut self, v: u32) {
1472 self.write(&v.to_le_bytes());
1473 }
1474 pub fn flush(&mut self) {
1476 self.flushed.extend_from_slice(&self.buffer);
1477 self.buffer.clear();
1478 }
1479 pub fn maybe_flush(&mut self, current_time: f64) {
1481 if current_time - self.last_flush >= self.flush_interval {
1482 self.flush();
1483 self.last_flush = current_time;
1484 }
1485 }
1486 pub fn flushed_bytes(&self) -> usize {
1488 self.flushed.len()
1489 }
1490}
1491#[derive(Debug)]
1496pub struct TrajectoryCache {
1497 pub frames: HashMap<usize, TrajFrame>,
1499 pub capacity: usize,
1501 pub(super) access_counter: u64,
1503 pub misses: usize,
1505 pub hits: usize,
1507 pub evictions: usize,
1509}
1510impl TrajectoryCache {
1511 pub fn new(capacity: usize) -> Self {
1513 Self {
1514 frames: HashMap::new(),
1515 capacity,
1516 access_counter: 0,
1517 misses: 0,
1518 hits: 0,
1519 evictions: 0,
1520 }
1521 }
1522 pub fn insert(&mut self, idx: usize, time: f64, positions: Vec<f64>) {
1524 if self.frames.contains_key(&idx) {
1525 self.access_counter += 1;
1526 if let Some(f) = self.frames.get_mut(&idx) {
1527 f.last_access = self.access_counter;
1528 f.positions = positions;
1529 f.time = time;
1530 }
1531 return;
1532 }
1533 if self.frames.len() >= self.capacity {
1534 self.evict_lru();
1535 }
1536 self.access_counter += 1;
1537 self.frames.insert(
1538 idx,
1539 TrajFrame {
1540 idx,
1541 time,
1542 positions,
1543 last_access: self.access_counter,
1544 },
1545 );
1546 }
1547 pub fn get(&mut self, idx: usize) -> Option<&TrajFrame> {
1549 if self.frames.contains_key(&idx) {
1550 self.access_counter += 1;
1551 let ac = self.access_counter;
1552 self.frames
1553 .get_mut(&idx)
1554 .expect("key must exist in map")
1555 .last_access = ac;
1556 self.hits += 1;
1557 self.frames.get(&idx)
1558 } else {
1559 self.misses += 1;
1560 None
1561 }
1562 }
1563 fn evict_lru(&mut self) {
1565 if let Some(&evict_idx) = self
1566 .frames
1567 .iter()
1568 .min_by_key(|(_, f)| f.last_access)
1569 .map(|(k, _)| k)
1570 {
1571 self.frames.remove(&evict_idx);
1572 self.evictions += 1;
1573 }
1574 }
1575 pub fn len(&self) -> usize {
1577 self.frames.len()
1578 }
1579 pub fn is_empty(&self) -> bool {
1581 self.frames.is_empty()
1582 }
1583 pub fn hit_ratio(&self) -> f64 {
1585 let total = self.hits + self.misses;
1586 if total == 0 {
1587 return 0.0;
1588 }
1589 self.hits as f64 / total as f64
1590 }
1591}
1592#[derive(Debug)]
1594pub struct StreamReader {
1595 pub data: Vec<u8>,
1597 pub pos: usize,
1599 pub frame_size: usize,
1601 pub frames_read: usize,
1603}
1604impl StreamReader {
1605 pub fn new(data: Vec<u8>, frame_size: usize) -> Self {
1607 Self {
1608 data,
1609 pos: 0,
1610 frame_size,
1611 frames_read: 0,
1612 }
1613 }
1614 pub fn read_frame(&mut self) -> Option<Vec<u8>> {
1616 if self.pos + self.frame_size > self.data.len() {
1617 return None;
1618 }
1619 let frame = self.data[self.pos..self.pos + self.frame_size].to_vec();
1620 self.pos += self.frame_size;
1621 self.frames_read += 1;
1622 Some(frame)
1623 }
1624 pub fn read_all<F: FnMut(&[u8])>(&mut self, mut callback: F) {
1626 while let Some(frame) = self.read_frame() {
1627 callback(&frame);
1628 }
1629 }
1630 pub fn seek(&mut self, pos: usize) {
1632 self.pos = pos.min(self.data.len());
1633 }
1634 pub fn remaining(&self) -> usize {
1636 self.data.len().saturating_sub(self.pos)
1637 }
1638 pub fn append_data(&mut self, more: &[u8]) {
1640 self.data.extend_from_slice(more);
1641 }
1642}