Skip to main content

wifi_densepose_worldgraph/
graph.rs

1//! ADR-139 §2.2–2.5 — graph container, provenance, privacy rollup, queries.
2
3use std::collections::HashMap;
4
5use petgraph::stable_graph::{NodeIndex, StableDiGraph};
6use petgraph::visit::{EdgeRef, IntoEdgeReferences};
7use petgraph::Direction;
8use serde::{Deserialize, Serialize};
9use wifi_densepose_geo::types::GeoRegistration;
10
11use crate::error::WorldGraphError;
12use crate::model::{SemanticProvenance, WorldEdge, WorldId, WorldNode};
13
14/// Current persisted schema version (ADR-136 §2.1 reserved-flag pattern).
15pub const SCHEMA_VERSION: u16 = 1;
16
17/// The typed environmental digital twin (ADR-139). Wraps a petgraph
18/// `StableDiGraph` and exposes a domain API; stable `WorldId → NodeIndex`
19/// mapping survives node removal.
20#[derive(Debug)]
21pub struct WorldGraph {
22    inner: StableDiGraph<WorldNode, WorldEdge>,
23    index: HashMap<WorldId, NodeIndex>,
24    registration: GeoRegistration,
25    next_id: u64,
26    schema_version: u16,
27}
28
29/// Serializable snapshot of a [`WorldGraph`] for RVF/JSON persistence.
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct WorldGraphSnapshot {
32    schema_version: u16,
33    registration: GeoRegistration,
34    next_id: u64,
35    nodes: Vec<WorldNode>,
36    /// Edges as (from_id, to_id, edge).
37    edges: Vec<(WorldId, WorldId, WorldEdge)>,
38}
39
40/// Result of a privacy-impact rollup (ADR-139 §2.4).
41#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42pub struct PrivacyRollup {
43    /// Active mode name.
44    pub mode: String,
45    /// Nodes that become unobservable under this mode.
46    pub suppressed_nodes: Vec<WorldId>,
47    /// (sensor, node) pairs newly denied.
48    pub denied_pairs: Vec<(WorldId, WorldId)>,
49    /// Count of still-allowed (sensor, node) pairs.
50    pub allowed_pairs: usize,
51}
52
53impl WorldGraph {
54    /// Create an empty graph registered to an installation origin.
55    #[must_use]
56    pub fn new(registration: GeoRegistration) -> Self {
57        Self {
58            inner: StableDiGraph::new(),
59            index: HashMap::new(),
60            registration,
61            next_id: 1,
62            schema_version: SCHEMA_VERSION,
63        }
64    }
65
66    /// Installation geo-registration (ADR-044).
67    #[must_use]
68    pub fn registration(&self) -> &GeoRegistration {
69        &self.registration
70    }
71
72    /// Number of live nodes.
73    #[must_use]
74    pub fn node_count(&self) -> usize {
75        self.inner.node_count()
76    }
77
78    /// Insert or replace a node, returning its stable `WorldId`. If the node's
79    /// embedded id is `UNASSIGNED`, a fresh id is allocated; if it names an
80    /// existing id, that node's weight is replaced in place (upsert).
81    pub fn upsert_node(&mut self, mut node: WorldNode) -> WorldId {
82        let id = if node.id().is_unassigned() {
83            let fresh = WorldId(self.next_id);
84            self.next_id += 1;
85            node.set_id(fresh);
86            fresh
87        } else {
88            self.next_id = self.next_id.max(node.id().0 + 1);
89            node.id()
90        };
91
92        if let Some(&idx) = self.index.get(&id) {
93            self.inner[idx] = node;
94        } else {
95            let idx = self.inner.add_node(node);
96            self.index.insert(id, idx);
97        }
98        id
99    }
100
101    /// Add a typed edge between two known nodes.
102    ///
103    /// # Errors
104    /// [`WorldGraphError::UnknownNode`] if either endpoint is unknown.
105    pub fn add_edge(
106        &mut self,
107        from: WorldId,
108        to: WorldId,
109        edge: WorldEdge,
110    ) -> Result<(), WorldGraphError> {
111        let f = *self.index.get(&from).ok_or(WorldGraphError::UnknownNode(from))?;
112        let t = *self.index.get(&to).ok_or(WorldGraphError::UnknownNode(to))?;
113        self.inner.add_edge(f, t, edge);
114        Ok(())
115    }
116
117    /// Borrow a node by id.
118    #[must_use]
119    pub fn node(&self, id: WorldId) -> Option<&WorldNode> {
120        self.index.get(&id).map(|&idx| &self.inner[idx])
121    }
122
123    /// Remove a node and its incident edges (e.g. a person leaves).
124    pub fn remove_node(&mut self, id: WorldId) -> Option<WorldNode> {
125        let idx = self.index.remove(&id)?;
126        self.inner.remove_node(idx)
127    }
128
129    /// Outgoing neighbours of a node with the connecting edge.
130    pub fn neighbors(&self, id: WorldId) -> Vec<(WorldId, WorldEdge)> {
131        let Some(&idx) = self.index.get(&id) else {
132            return Vec::new();
133        };
134        self.inner
135            .edges_directed(idx, Direction::Outgoing)
136            .map(|e| (self.inner[e.target()].id(), e.weight().clone()))
137            .collect()
138    }
139
140    /// Resolve a HomeCore `area_id` to its Room node (entity linkage, ADR-127).
141    #[must_use]
142    pub fn room_for_area(&self, area_id: &str) -> Option<WorldId> {
143        self.inner.node_weights().find_map(|n| match n {
144            WorldNode::Room { id, area_id: Some(a), .. } if a == area_id => Some(*id),
145            _ => None,
146        })
147    }
148
149    // ---- ADR-139 §2.5 query API (v1) ----
150
151    /// Observability chain: which nodes a sensor currently `observes`.
152    #[must_use]
153    pub fn observed_by(&self, sensor: WorldId) -> Vec<WorldId> {
154        self.neighbors(sensor)
155            .into_iter()
156            .filter(|(_, e)| matches!(e, WorldEdge::Observes { .. }))
157            .map(|(id, _)| id)
158            .collect()
159    }
160
161    /// Location query: contents of a room/zone (incoming `located_in` edges).
162    #[must_use]
163    pub fn contents_of(&self, container: WorldId) -> Vec<WorldId> {
164        let Some(&idx) = self.index.get(&container) else {
165            return Vec::new();
166        };
167        self.inner
168            .edges_directed(idx, Direction::Incoming)
169            .filter(|e| matches!(e.weight(), WorldEdge::LocatedIn { .. }))
170            .map(|e| self.inner[e.source()].id())
171            .collect()
172    }
173
174    /// Append-with-provenance: insert a `SemanticState` and wire `DerivedFrom`
175    /// edges to its evidence sources (ADR-139 §2.3). Sources unknown to the
176    /// graph are skipped (evidence may be raw frames not modelled as nodes).
177    pub fn add_semantic_state(
178        &mut self,
179        statement: String,
180        confidence: f32,
181        valid_from_unix_ms: i64,
182        provenance: SemanticProvenance,
183        evidence_sources: &[WorldId],
184    ) -> WorldId {
185        let evidence_handles = provenance.evidence.clone();
186        let id = self.upsert_node(WorldNode::SemanticState {
187            id: WorldId::UNASSIGNED,
188            statement,
189            confidence,
190            provenance,
191            valid_from_unix_ms,
192        });
193        for (src, handle) in evidence_sources.iter().zip(
194            evidence_handles
195                .iter()
196                .cloned()
197                .chain(std::iter::repeat(String::new())),
198        ) {
199            let _ = self.add_edge(id, *src, WorldEdge::DerivedFrom { evidence: handle });
200        }
201        id
202    }
203
204    /// Record a contradiction between two still-live beliefs (ADR-139 §2.3).
205    /// Neither node is deleted — the disagreement stays queryable.
206    ///
207    /// # Errors
208    /// [`WorldGraphError::UnknownNode`] if either node is unknown.
209    pub fn add_contradiction(
210        &mut self,
211        a: WorldId,
212        b: WorldId,
213        magnitude: f32,
214        flag: String,
215    ) -> Result<(), WorldGraphError> {
216        self.add_edge(a, b, WorldEdge::Contradicts { magnitude, flag })
217    }
218
219    /// Recompute `PrivacyLimitedBy` edges for the active mode (ADR-139 §2.4).
220    ///
221    /// `policy(modality_kind, node_kind) -> allowed` decides, for each existing
222    /// `Observes` edge, whether the sensor may still observe the target under
223    /// `mode`. A matching `PrivacyLimitedBy` edge is appended recording the
224    /// decision; denied pairs are rolled up.
225    pub fn apply_privacy_mode<F>(&mut self, mode: &str, action: &str, policy: F) -> PrivacyRollup
226    where
227        F: Fn(&str, &str) -> bool,
228    {
229        // Collect (sensor, target, allowed) from current Observes edges.
230        let mut decisions: Vec<(WorldId, WorldId, bool)> = Vec::new();
231        for e in self.inner.edge_references() {
232            if matches!(e.weight(), WorldEdge::Observes { .. }) {
233                let sensor = &self.inner[e.source()];
234                let target = &self.inner[e.target()];
235                let allowed = policy(sensor.kind(), target.kind());
236                decisions.push((sensor.id(), target.id(), allowed));
237            }
238        }
239
240        let mut denied_pairs = Vec::new();
241        let mut suppressed = Vec::new();
242        let mut allowed_pairs = 0usize;
243        for (sensor, target, allowed) in &decisions {
244            let _ = self.add_edge(
245                *sensor,
246                *target,
247                WorldEdge::PrivacyLimitedBy {
248                    mode: mode.to_string(),
249                    action: action.to_string(),
250                    allowed: *allowed,
251                },
252            );
253            if *allowed {
254                allowed_pairs += 1;
255            } else {
256                denied_pairs.push((*sensor, *target));
257                if !suppressed.contains(target) {
258                    suppressed.push(*target);
259                }
260            }
261        }
262
263        PrivacyRollup {
264            mode: mode.to_string(),
265            suppressed_nodes: suppressed,
266            denied_pairs,
267            allowed_pairs,
268        }
269    }
270
271    // ---- Persistence (RVF/JSON) ----
272
273    /// Snapshot the graph for persistence.
274    #[must_use]
275    pub fn snapshot(&self) -> WorldGraphSnapshot {
276        let nodes: Vec<WorldNode> = self.inner.node_weights().cloned().collect();
277        let edges: Vec<(WorldId, WorldId, WorldEdge)> = self
278            .inner
279            .edge_references()
280            .map(|e| {
281                (
282                    self.inner[e.source()].id(),
283                    self.inner[e.target()].id(),
284                    e.weight().clone(),
285                )
286            })
287            .collect();
288        WorldGraphSnapshot {
289            schema_version: self.schema_version,
290            registration: self.registration.clone(),
291            next_id: self.next_id,
292            nodes,
293            edges,
294        }
295    }
296
297    /// Serialize to deterministic JSON bytes (RVF payload).
298    ///
299    /// # Errors
300    /// [`WorldGraphError::Serde`] on serialisation failure.
301    pub fn to_json(&self) -> Result<Vec<u8>, WorldGraphError> {
302        Ok(serde_json::to_vec(&self.snapshot())?)
303    }
304
305    /// Reconstruct a graph from a snapshot's JSON bytes.
306    ///
307    /// # Errors
308    /// [`WorldGraphError::Serde`] on parse failure.
309    pub fn from_json(bytes: &[u8]) -> Result<Self, WorldGraphError> {
310        let snap: WorldGraphSnapshot = serde_json::from_slice(bytes)?;
311        let mut g = Self::new(snap.registration);
312        g.schema_version = snap.schema_version;
313        for node in snap.nodes {
314            g.upsert_node(node);
315        }
316        for (from, to, edge) in snap.edges {
317            g.add_edge(from, to, edge)?;
318        }
319        g.next_id = snap.next_id;
320        Ok(g)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use crate::model::{EnuPoint, SensorModality, WorldEdge, ZoneBoundsEnu};
328
329    fn enu(e: f64, n: f64) -> EnuPoint {
330        EnuPoint { east_m: e, north_m: n, up_m: 0.0 }
331    }
332
333    fn living_room() -> WorldNode {
334        WorldNode::Room {
335            id: WorldId::UNASSIGNED,
336            area_id: Some("living_room".into()),
337            name: "Living Room".into(),
338            bounds_enu: ZoneBoundsEnu::Rectangle { min_e: 0.0, min_n: 0.0, max_e: 5.0, max_n: 4.0 },
339            floor: 0,
340        }
341    }
342
343    #[test]
344    fn upsert_allocates_and_replaces() {
345        let mut g = WorldGraph::new(GeoRegistration::default());
346        let id = g.upsert_node(living_room());
347        assert!(!id.is_unassigned());
348        assert_eq!(g.node_count(), 1);
349        // Upsert same id with new name → replace in place, count unchanged.
350        g.upsert_node(WorldNode::Room {
351            id,
352            area_id: Some("living_room".into()),
353            name: "Lounge".into(),
354            bounds_enu: ZoneBoundsEnu::Rectangle { min_e: 0.0, min_n: 0.0, max_e: 5.0, max_n: 4.0 },
355            floor: 0,
356        });
357        assert_eq!(g.node_count(), 1);
358        assert!(matches!(g.node(id), Some(WorldNode::Room { name, .. }) if name == "Lounge"));
359    }
360
361    #[test]
362    fn area_linkage_and_observability() {
363        let mut g = WorldGraph::new(GeoRegistration::default());
364        let room = g.upsert_node(living_room());
365        let sensor = g.upsert_node(WorldNode::Sensor {
366            id: WorldId::UNASSIGNED,
367            device_id: "esp32-com9".into(),
368            position: enu(1.0, 1.0),
369            modality: SensorModality::WifiCsi,
370        });
371        g.add_edge(sensor, room, WorldEdge::Observes { quality: 0.9, last_seen_unix_ms: 1 })
372            .unwrap();
373
374        assert_eq!(g.room_for_area("living_room"), Some(room));
375        assert_eq!(g.observed_by(sensor), vec![room]);
376    }
377
378    #[test]
379    fn add_edge_unknown_endpoint_errors() {
380        let mut g = WorldGraph::new(GeoRegistration::default());
381        let room = g.upsert_node(living_room());
382        let err = g.add_edge(room, WorldId(999), WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 });
383        assert!(matches!(err, Err(WorldGraphError::UnknownNode(WorldId(999)))));
384    }
385
386    #[test]
387    fn location_query_contents_of() {
388        let mut g = WorldGraph::new(GeoRegistration::default());
389        let room = g.upsert_node(living_room());
390        let person = g.upsert_node(WorldNode::PersonTrack {
391            id: WorldId::UNASSIGNED,
392            track_id: 7,
393            last_position: enu(2.0, 2.0),
394            reid_embedding_ref: None,
395        });
396        g.add_edge(person, room, WorldEdge::LocatedIn { since_unix_ms: 100 }).unwrap();
397        assert_eq!(g.contents_of(room), vec![person]);
398    }
399
400    #[test]
401    fn semantic_state_provenance_and_contradiction() {
402        let mut g = WorldGraph::new(GeoRegistration::default());
403        let event = g.upsert_node(WorldNode::Event {
404            id: WorldId::UNASSIGNED,
405            event_type: "motion".into(),
406            at_unix_ms: 10,
407            located_in: None,
408        });
409        let prov = SemanticProvenance {
410            evidence: vec!["ev:abc".into()],
411            model_version: "rfenc-1.0".into(),
412            calibration_version: "cal:uuid".into(),
413            privacy_decision: "PrivateHome/Allow".into(),
414        };
415        let s1 = g.add_semantic_state("present".into(), 0.9, 11, prov.clone(), &[event]);
416        // DerivedFrom edge to the evidence event exists.
417        assert!(g.neighbors(s1).iter().any(|(to, e)| *to == event
418            && matches!(e, WorldEdge::DerivedFrom { .. })));
419
420        let s2 = g.add_semantic_state("absent".into(), 0.6, 12, prov, &[event]);
421        g.add_contradiction(s1, s2, 0.3, "flag:ts".into()).unwrap();
422        // Both beliefs retained; contradiction queryable.
423        assert!(g.node(s1).is_some() && g.node(s2).is_some());
424        assert!(g.neighbors(s1).iter().any(|(_, e)| matches!(e, WorldEdge::Contradicts { .. })));
425    }
426
427    #[test]
428    fn privacy_rollup_suppresses_person_tracks() {
429        let mut g = WorldGraph::new(GeoRegistration::default());
430        let room = g.upsert_node(living_room());
431        let person = g.upsert_node(WorldNode::PersonTrack {
432            id: WorldId::UNASSIGNED,
433            track_id: 1,
434            last_position: enu(1.0, 1.0),
435            reid_embedding_ref: None,
436        });
437        let sensor = g.upsert_node(WorldNode::Sensor {
438            id: WorldId::UNASSIGNED,
439            device_id: "s".into(),
440            position: enu(0.0, 0.0),
441            modality: SensorModality::WifiCsi,
442        });
443        g.add_edge(sensor, room, WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 }).unwrap();
444        g.add_edge(sensor, person, WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 }).unwrap();
445
446        // StrictNoIdentity: rooms observable, person_tracks suppressed.
447        let rollup = g.apply_privacy_mode("StrictNoIdentity", "SuppressIdentity", |_modality, node_kind| {
448            node_kind != "person_track"
449        });
450        assert_eq!(rollup.allowed_pairs, 1);
451        assert_eq!(rollup.denied_pairs, vec![(sensor, person)]);
452        assert_eq!(rollup.suppressed_nodes, vec![person]);
453    }
454
455    #[test]
456    fn json_roundtrip_preserves_nodes_and_edges() {
457        let mut g = WorldGraph::new(GeoRegistration::default());
458        let room = g.upsert_node(living_room());
459        let sensor = g.upsert_node(WorldNode::Sensor {
460            id: WorldId::UNASSIGNED,
461            device_id: "s".into(),
462            position: enu(0.0, 0.0),
463            modality: SensorModality::WifiCsi,
464        });
465        g.add_edge(sensor, room, WorldEdge::Observes { quality: 0.8, last_seen_unix_ms: 5 }).unwrap();
466
467        let bytes = g.to_json().unwrap();
468        let g2 = WorldGraph::from_json(&bytes).unwrap();
469        assert_eq!(g2.node_count(), 2);
470        assert_eq!(g2.room_for_area("living_room"), Some(room));
471        assert_eq!(g2.observed_by(sensor), vec![room]);
472        // Deterministic: re-serialising the reconstructed graph matches.
473        assert_eq!(g2.to_json().unwrap(), bytes);
474    }
475}