Skip to main content

oxihuman_export/
realtime_stream.rs

1// Copyright (C) 2026 COOLJAPAN OU (Team KitaSan)
2// SPDX-License-Identifier: Apache-2.0
3
4//! Real-time mesh streaming session with delta/quantized encoding (WebSocket stub).
5
6// NOTE: `StreamFormat` is also defined in `streaming_export`. We use a distinct
7// type here scoped to this module. The pub re-export in lib.rs must alias one of them.
8
9#[allow(dead_code)]
10/// One streaming frame of mesh data.
11#[derive(Debug, Clone)]
12pub struct StreamFrame {
13    pub frame_id: u64,
14    pub timestamp_ms: u64,
15    pub positions: Vec<[f32; 3]>,
16    pub normals: Vec<[f32; 3]>,
17}
18
19#[allow(dead_code)]
20/// Compression settings.
21#[derive(Debug, Clone)]
22pub struct StreamCompression {
23    /// "none", "delta", or "quantized"
24    pub kind: String,
25    pub level: u8,
26}
27
28#[allow(dead_code)]
29/// Wire format settings.
30#[derive(Debug, Clone)]
31pub struct RtStreamFormat {
32    /// "json", "binary", or "msgpack-stub"
33    pub kind: String,
34}
35
36#[allow(dead_code)]
37/// Configuration for a streaming session.
38#[derive(Debug, Clone)]
39pub struct StreamConfig {
40    pub target_fps: f32,
41    pub compression: StreamCompression,
42    pub format: RtStreamFormat,
43}
44
45#[allow(dead_code)]
46/// An active streaming session accumulating frames.
47pub struct StreamSession {
48    pub config: StreamConfig,
49    pub frames: Vec<StreamFrame>,
50    pub base_frame: Option<StreamFrame>,
51    next_id: u64,
52    time_ms: u64,
53}
54
55impl StreamSession {
56    /// Create a new session.
57    pub fn new(config: StreamConfig) -> Self {
58        StreamSession {
59            config,
60            frames: Vec::new(),
61            base_frame: None,
62            next_id: 0,
63            time_ms: 0,
64        }
65    }
66
67    /// Push a new frame into the session.
68    pub fn push_frame(&mut self, positions: Vec<[f32; 3]>, normals: Vec<[f32; 3]>) {
69        let frame = StreamFrame {
70            frame_id: self.next_id,
71            timestamp_ms: self.time_ms,
72            positions,
73            normals,
74        };
75        if self.next_id == 0 {
76            self.base_frame = Some(frame.clone());
77        }
78        self.frames.push(frame);
79        self.next_id += 1;
80        let dt_ms = (1000.0 / self.config.target_fps.max(1.0)) as u64;
81        self.time_ms += dt_ms;
82    }
83
84    /// Encode a stored frame by index based on session compression config.
85    pub fn encode_frame(&self, idx: usize) -> Vec<u8> {
86        let frame = match self.frames.get(idx) {
87            Some(f) => f,
88            None => return vec![],
89        };
90        match self.config.compression.kind.as_str() {
91            "delta" => {
92                let base = self
93                    .base_frame
94                    .as_ref()
95                    .map(|b| b.positions.as_slice())
96                    .unwrap_or(&[]);
97                let deltas = delta_encode_positions(base, &frame.positions);
98                encode_frame_binary(frame.frame_id, frame.timestamp_ms, &deltas, &frame.normals)
99            }
100            "quantized" => {
101                let (bmin, bmax) = positions_bounds(&frame.positions);
102                let q = quantize_positions_16bit(&frame.positions, bmin, bmax);
103                encode_frame_quantized(frame.frame_id, frame.timestamp_ms, &q, bmin, bmax)
104            }
105            _ => encode_frame_binary(
106                frame.frame_id,
107                frame.timestamp_ms,
108                &frame.positions,
109                &frame.normals,
110            ),
111        }
112    }
113
114    /// Decode a frame from bytes.
115    pub fn decode_frame(data: &[u8]) -> Option<StreamFrame> {
116        decode_frame_binary(data)
117    }
118
119    /// Number of frames in this session.
120    pub fn frame_count(&self) -> usize {
121        self.frames.len()
122    }
123
124    /// Average encoded size (bytes) over all frames.
125    pub fn avg_frame_size(&self) -> f32 {
126        if self.frames.is_empty() {
127            return 0.0;
128        }
129        let total: usize = (0..self.frames.len())
130            .map(|i| self.encode_frame(i).len())
131            .sum();
132        total as f32 / self.frames.len() as f32
133    }
134}
135
136// ---------------------------------------------------------------------------
137// Binary frame encoding (simple: header + raw f32 array)
138// ---------------------------------------------------------------------------
139// Layout: magic(4) | frame_id(8) | timestamp_ms(8) | n_pos(4) | pos(n*12) | n_norm(4) | norm(n*12)
140
141const FRAME_MAGIC: &[u8; 4] = b"OXSF";
142
143fn encode_frame_binary(
144    frame_id: u64,
145    timestamp_ms: u64,
146    positions: &[[f32; 3]],
147    normals: &[[f32; 3]],
148) -> Vec<u8> {
149    let mut out = Vec::new();
150    out.extend_from_slice(FRAME_MAGIC);
151    out.extend_from_slice(&frame_id.to_le_bytes());
152    out.extend_from_slice(&timestamp_ms.to_le_bytes());
153    let np = positions.len() as u32;
154    out.extend_from_slice(&np.to_le_bytes());
155    for p in positions {
156        for &v in p {
157            out.extend_from_slice(&v.to_le_bytes());
158        }
159    }
160    let nn = normals.len() as u32;
161    out.extend_from_slice(&nn.to_le_bytes());
162    for n in normals {
163        for &v in n {
164            out.extend_from_slice(&v.to_le_bytes());
165        }
166    }
167    out
168}
169
170fn decode_frame_binary(data: &[u8]) -> Option<StreamFrame> {
171    if data.len() < 24 {
172        return None;
173    }
174    if &data[..4] != FRAME_MAGIC && &data[..4] != b"OXQF" {
175        return None;
176    }
177    // Support both OXSF and OXQF (quantized) — but quantized decode is different
178    if &data[..4] == b"OXQF" {
179        return decode_frame_quantized(data);
180    }
181    let frame_id = u64::from_le_bytes(data[4..12].try_into().ok()?);
182    let timestamp_ms = u64::from_le_bytes(data[12..20].try_into().ok()?);
183    let np = u32::from_le_bytes(data[20..24].try_into().ok()?) as usize;
184    let pos_end = 24 + np * 12;
185    if data.len() < pos_end + 4 {
186        return None;
187    }
188    let mut positions = Vec::with_capacity(np);
189    for i in 0..np {
190        let off = 24 + i * 12;
191        let x = f32::from_le_bytes(data[off..off + 4].try_into().ok()?);
192        let y = f32::from_le_bytes(data[off + 4..off + 8].try_into().ok()?);
193        let z = f32::from_le_bytes(data[off + 8..off + 12].try_into().ok()?);
194        positions.push([x, y, z]);
195    }
196    let nn = u32::from_le_bytes(data[pos_end..pos_end + 4].try_into().ok()?) as usize;
197    let norm_end = pos_end + 4 + nn * 12;
198    if data.len() < norm_end {
199        return None;
200    }
201    let mut normals = Vec::with_capacity(nn);
202    for i in 0..nn {
203        let off = pos_end + 4 + i * 12;
204        let x = f32::from_le_bytes(data[off..off + 4].try_into().ok()?);
205        let y = f32::from_le_bytes(data[off + 4..off + 8].try_into().ok()?);
206        let z = f32::from_le_bytes(data[off + 8..off + 12].try_into().ok()?);
207        normals.push([x, y, z]);
208    }
209    Some(StreamFrame {
210        frame_id,
211        timestamp_ms,
212        positions,
213        normals,
214    })
215}
216
217// ---------------------------------------------------------------------------
218// Quantized frame encoding
219// Layout: magic(4=OXQF) | frame_id(8) | ts(8) | bmin(12) | bmax(12) | n(4) | u16*3*n
220// ---------------------------------------------------------------------------
221
222fn encode_frame_quantized(
223    frame_id: u64,
224    timestamp_ms: u64,
225    quantized: &[[u16; 3]],
226    bmin: [f32; 3],
227    bmax: [f32; 3],
228) -> Vec<u8> {
229    let mut out = Vec::new();
230    out.extend_from_slice(b"OXQF");
231    out.extend_from_slice(&frame_id.to_le_bytes());
232    out.extend_from_slice(&timestamp_ms.to_le_bytes());
233    for &v in &bmin {
234        out.extend_from_slice(&v.to_le_bytes());
235    }
236    for &v in &bmax {
237        out.extend_from_slice(&v.to_le_bytes());
238    }
239    let n = quantized.len() as u32;
240    out.extend_from_slice(&n.to_le_bytes());
241    for q in quantized {
242        for &v in q {
243            out.extend_from_slice(&v.to_le_bytes());
244        }
245    }
246    out
247}
248
249fn decode_frame_quantized(data: &[u8]) -> Option<StreamFrame> {
250    // magic(4) + frame_id(8) + ts(8) + bmin(12) + bmax(12) + n(4) = 48 bytes header
251    if data.len() < 48 {
252        return None;
253    }
254    let frame_id = u64::from_le_bytes(data[4..12].try_into().ok()?);
255    let timestamp_ms = u64::from_le_bytes(data[12..20].try_into().ok()?);
256    let bmin = [
257        f32::from_le_bytes(data[20..24].try_into().ok()?),
258        f32::from_le_bytes(data[24..28].try_into().ok()?),
259        f32::from_le_bytes(data[28..32].try_into().ok()?),
260    ];
261    let bmax = [
262        f32::from_le_bytes(data[32..36].try_into().ok()?),
263        f32::from_le_bytes(data[36..40].try_into().ok()?),
264        f32::from_le_bytes(data[40..44].try_into().ok()?),
265    ];
266    let n = u32::from_le_bytes(data[44..48].try_into().ok()?) as usize;
267    if data.len() < 48 + n * 6 {
268        return None;
269    }
270    let mut quantized = Vec::with_capacity(n);
271    for i in 0..n {
272        let off = 48 + i * 6;
273        let x = u16::from_le_bytes(data[off..off + 2].try_into().ok()?);
274        let y = u16::from_le_bytes(data[off + 2..off + 4].try_into().ok()?);
275        let z = u16::from_le_bytes(data[off + 4..off + 6].try_into().ok()?);
276        quantized.push([x, y, z]);
277    }
278    let positions = dequantize_positions_16bit(&quantized, bmin, bmax);
279    Some(StreamFrame {
280        frame_id,
281        timestamp_ms,
282        positions,
283        normals: vec![],
284    })
285}
286
287// ---------------------------------------------------------------------------
288// Delta encoding
289// ---------------------------------------------------------------------------
290
291/// Per-vertex position delta: current - base.
292pub fn delta_encode_positions(base: &[[f32; 3]], current: &[[f32; 3]]) -> Vec<[f32; 3]> {
293    let n = current.len().min(base.len());
294    let mut out: Vec<[f32; 3]> = current
295        .iter()
296        .enumerate()
297        .map(|(i, c)| {
298            if i < n {
299                [c[0] - base[i][0], c[1] - base[i][1], c[2] - base[i][2]]
300            } else {
301                *c
302            }
303        })
304        .collect();
305    // If current is shorter than base, stop; if longer, keep extras as-is
306    out.truncate(current.len());
307    out
308}
309
310/// Reconstruct positions from base + delta.
311pub fn delta_decode_positions(base: &[[f32; 3]], delta: &[[f32; 3]]) -> Vec<[f32; 3]> {
312    delta
313        .iter()
314        .enumerate()
315        .map(|(i, d)| {
316            if i < base.len() {
317                [base[i][0] + d[0], base[i][1] + d[1], base[i][2] + d[2]]
318            } else {
319                *d
320            }
321        })
322        .collect()
323}
324
325// ---------------------------------------------------------------------------
326// 16-bit quantization
327// ---------------------------------------------------------------------------
328
329fn positions_bounds(positions: &[[f32; 3]]) -> ([f32; 3], [f32; 3]) {
330    let mut mn = [f32::INFINITY; 3];
331    let mut mx = [f32::NEG_INFINITY; 3];
332    for p in positions {
333        for d in 0..3 {
334            if p[d] < mn[d] {
335                mn[d] = p[d];
336            }
337            if p[d] > mx[d] {
338                mx[d] = p[d];
339            }
340        }
341    }
342    if mn[0].is_infinite() {
343        ([0.0; 3], [1.0; 3])
344    } else {
345        (mn, mx)
346    }
347}
348
349/// Quantize positions to 16-bit unsigned integers within given bounds.
350pub fn quantize_positions_16bit(
351    positions: &[[f32; 3]],
352    bounds_min: [f32; 3],
353    bounds_max: [f32; 3],
354) -> Vec<[u16; 3]> {
355    positions
356        .iter()
357        .map(|p| {
358            let mut q = [0u16; 3];
359            for d in 0..3 {
360                let range = (bounds_max[d] - bounds_min[d]).max(1e-9);
361                let t = ((p[d] - bounds_min[d]) / range).clamp(0.0, 1.0);
362                q[d] = (t * 65535.0) as u16;
363            }
364            q
365        })
366        .collect()
367}
368
369/// Dequantize 16-bit positions back to f32.
370pub fn dequantize_positions_16bit(
371    quantized: &[[u16; 3]],
372    bounds_min: [f32; 3],
373    bounds_max: [f32; 3],
374) -> Vec<[f32; 3]> {
375    quantized
376        .iter()
377        .map(|q| {
378            let mut p = [0.0f32; 3];
379            for d in 0..3 {
380                let range = bounds_max[d] - bounds_min[d];
381                p[d] = bounds_min[d] + (q[d] as f32 / 65535.0) * range;
382            }
383            p
384        })
385        .collect()
386}
387
388// Re-export the public types under the names the lib.rs re-export expects
389pub use RtStreamFormat as StreamFormat;
390
391// ---------------------------------------------------------------------------
392// Tests
393// ---------------------------------------------------------------------------
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    fn make_config(compression: &str) -> StreamConfig {
400        StreamConfig {
401            target_fps: 30.0,
402            compression: StreamCompression {
403                kind: compression.into(),
404                level: 1,
405            },
406            format: RtStreamFormat {
407                kind: "binary".into(),
408            },
409        }
410    }
411
412    fn sample_positions() -> Vec<[f32; 3]> {
413        vec![
414            [0.0, 0.0, 0.0],
415            [1.0, 0.0, 0.0],
416            [0.5, 1.0, 0.0],
417            [0.5, 0.5, 1.0],
418        ]
419    }
420
421    fn sample_normals() -> Vec<[f32; 3]> {
422        vec![[0.0, 0.0, 1.0]; 4]
423    }
424
425    #[test]
426    fn test_push_frame_increases_count() {
427        let mut session = StreamSession::new(make_config("none"));
428        assert_eq!(session.frame_count(), 0);
429        session.push_frame(sample_positions(), sample_normals());
430        assert_eq!(session.frame_count(), 1);
431        session.push_frame(sample_positions(), sample_normals());
432        assert_eq!(session.frame_count(), 2);
433    }
434
435    #[test]
436    fn test_encode_decode_round_trip_none() {
437        let mut session = StreamSession::new(make_config("none"));
438        session.push_frame(sample_positions(), sample_normals());
439        let encoded = session.encode_frame(0);
440        let decoded = StreamSession::decode_frame(&encoded).expect("decode failed");
441        assert_eq!(decoded.positions.len(), 4);
442        for (orig, dec) in sample_positions().iter().zip(decoded.positions.iter()) {
443            for d in 0..3 {
444                assert!((orig[d] - dec[d]).abs() < 1e-5);
445            }
446        }
447    }
448
449    #[test]
450    fn test_encode_decode_round_trip_quantized() {
451        let mut session = StreamSession::new(make_config("quantized"));
452        session.push_frame(sample_positions(), sample_normals());
453        let encoded = session.encode_frame(0);
454        let decoded = StreamSession::decode_frame(&encoded).expect("decode failed");
455        assert_eq!(decoded.positions.len(), 4);
456        // quantization error should be < 0.001 for range [0,1]
457        for (orig, dec) in sample_positions().iter().zip(decoded.positions.iter()) {
458            for d in 0..3 {
459                assert!(
460                    (orig[d] - dec[d]).abs() < 0.001,
461                    "quantization error too large"
462                );
463            }
464        }
465    }
466
467    #[test]
468    fn test_encode_out_of_range_returns_empty() {
469        let session = StreamSession::new(make_config("none"));
470        assert!(session.encode_frame(0).is_empty());
471    }
472
473    #[test]
474    fn test_frame_count_zero_initially() {
475        let session = StreamSession::new(make_config("none"));
476        assert_eq!(session.frame_count(), 0);
477    }
478
479    #[test]
480    fn test_avg_frame_size_zero_no_frames() {
481        let session = StreamSession::new(make_config("none"));
482        assert_eq!(session.avg_frame_size(), 0.0);
483    }
484
485    #[test]
486    fn test_avg_frame_size_positive() {
487        let mut session = StreamSession::new(make_config("none"));
488        session.push_frame(sample_positions(), sample_normals());
489        assert!(session.avg_frame_size() > 0.0);
490    }
491
492    #[test]
493    fn test_delta_encode_zero_for_identical() {
494        let pos = sample_positions();
495        let deltas = delta_encode_positions(&pos, &pos);
496        for d in &deltas {
497            for &v in d {
498                assert!(v.abs() < 1e-9, "expected zero delta");
499            }
500        }
501    }
502
503    #[test]
504    fn test_delta_encode_decode_recovers_original() {
505        let base = sample_positions();
506        let current: Vec<[f32; 3]> = base
507            .iter()
508            .map(|p| [p[0] + 0.1, p[1] - 0.2, p[2] + 0.05])
509            .collect();
510        let deltas = delta_encode_positions(&base, &current);
511        let recovered = delta_decode_positions(&base, &deltas);
512        for (orig, rec) in current.iter().zip(recovered.iter()) {
513            for d in 0..3 {
514                assert!((orig[d] - rec[d]).abs() < 1e-5);
515            }
516        }
517    }
518
519    #[test]
520    fn test_delta_empty_base() {
521        let current = sample_positions();
522        let deltas = delta_encode_positions(&[], &current);
523        assert_eq!(deltas.len(), current.len());
524    }
525
526    #[test]
527    fn test_quantize_stays_in_u16_range() {
528        let pos = vec![[-1.0f32, -1.0, -1.0], [1.0, 1.0, 1.0], [0.0, 0.5, -0.5]];
529        let bmin = [-1.0f32; 3];
530        let bmax = [1.0f32; 3];
531        let q = quantize_positions_16bit(&pos, bmin, bmax);
532        // All quantized values must be valid u16; min is 0, max is 65535.
533        // Check that the extreme input maps to 0 and 65535 correctly.
534        assert_eq!(q[0], [0u16, 0, 0]); // [-1,-1,-1] maps to 0
535        assert_eq!(q[1], [65535u16, 65535, 65535]); // [1,1,1] maps to 65535
536                                                    // Middle value should be in a reasonable midrange
537        assert!(q[2][1] > 0 && q[2][1] < 65535);
538    }
539
540    #[test]
541    fn test_quantize_dequantize_round_trip() {
542        let pos = vec![[0.0f32, 0.5, 1.0], [0.25, 0.75, 0.1]];
543        let bmin = [0.0f32; 3];
544        let bmax = [1.0f32; 3];
545        let q = quantize_positions_16bit(&pos, bmin, bmax);
546        let dq = dequantize_positions_16bit(&q, bmin, bmax);
547        for (orig, rec) in pos.iter().zip(dq.iter()) {
548            for d in 0..3 {
549                assert!((orig[d] - rec[d]).abs() < 0.0001);
550            }
551        }
552    }
553
554    #[test]
555    fn test_encode_delta_decode_frame_id() {
556        let mut session = StreamSession::new(make_config("none"));
557        session.push_frame(sample_positions(), sample_normals());
558        session.push_frame(sample_positions(), sample_normals());
559        let enc1 = session.encode_frame(1);
560        let dec = StreamSession::decode_frame(&enc1).expect("should succeed");
561        assert_eq!(dec.frame_id, 1);
562    }
563
564    #[test]
565    fn test_session_base_frame_set_after_first_push() {
566        let mut session = StreamSession::new(make_config("delta"));
567        assert!(session.base_frame.is_none());
568        session.push_frame(sample_positions(), sample_normals());
569        assert!(session.base_frame.is_some());
570    }
571}