1use std::collections::HashMap;
4
5use petgraph::stable_graph::{NodeIndex, StableDiGraph};
6use petgraph::visit::{EdgeRef, IntoEdgeReferences};
7use petgraph::Direction;
8use serde::{Deserialize, Serialize};
9use wifi_densepose_geo::types::GeoRegistration;
10
11use crate::error::WorldGraphError;
12use crate::model::{SemanticProvenance, WorldEdge, WorldId, WorldNode};
13
14pub const SCHEMA_VERSION: u16 = 1;
16
17#[derive(Debug)]
21pub struct WorldGraph {
22 inner: StableDiGraph<WorldNode, WorldEdge>,
23 index: HashMap<WorldId, NodeIndex>,
24 registration: GeoRegistration,
25 next_id: u64,
26 schema_version: u16,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct WorldGraphSnapshot {
32 schema_version: u16,
33 registration: GeoRegistration,
34 next_id: u64,
35 nodes: Vec<WorldNode>,
36 edges: Vec<(WorldId, WorldId, WorldEdge)>,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
42pub struct PrivacyRollup {
43 pub mode: String,
45 pub suppressed_nodes: Vec<WorldId>,
47 pub denied_pairs: Vec<(WorldId, WorldId)>,
49 pub allowed_pairs: usize,
51}
52
53impl WorldGraph {
54 #[must_use]
56 pub fn new(registration: GeoRegistration) -> Self {
57 Self {
58 inner: StableDiGraph::new(),
59 index: HashMap::new(),
60 registration,
61 next_id: 1,
62 schema_version: SCHEMA_VERSION,
63 }
64 }
65
66 #[must_use]
68 pub fn registration(&self) -> &GeoRegistration {
69 &self.registration
70 }
71
72 #[must_use]
74 pub fn node_count(&self) -> usize {
75 self.inner.node_count()
76 }
77
78 pub fn upsert_node(&mut self, mut node: WorldNode) -> WorldId {
82 let id = if node.id().is_unassigned() {
83 let fresh = WorldId(self.next_id);
84 self.next_id += 1;
85 node.set_id(fresh);
86 fresh
87 } else {
88 self.next_id = self.next_id.max(node.id().0 + 1);
89 node.id()
90 };
91
92 if let Some(&idx) = self.index.get(&id) {
93 self.inner[idx] = node;
94 } else {
95 let idx = self.inner.add_node(node);
96 self.index.insert(id, idx);
97 }
98 id
99 }
100
101 pub fn add_edge(
106 &mut self,
107 from: WorldId,
108 to: WorldId,
109 edge: WorldEdge,
110 ) -> Result<(), WorldGraphError> {
111 let f = *self.index.get(&from).ok_or(WorldGraphError::UnknownNode(from))?;
112 let t = *self.index.get(&to).ok_or(WorldGraphError::UnknownNode(to))?;
113 self.inner.add_edge(f, t, edge);
114 Ok(())
115 }
116
117 #[must_use]
119 pub fn node(&self, id: WorldId) -> Option<&WorldNode> {
120 self.index.get(&id).map(|&idx| &self.inner[idx])
121 }
122
123 pub fn remove_node(&mut self, id: WorldId) -> Option<WorldNode> {
125 let idx = self.index.remove(&id)?;
126 self.inner.remove_node(idx)
127 }
128
129 pub fn neighbors(&self, id: WorldId) -> Vec<(WorldId, WorldEdge)> {
131 let Some(&idx) = self.index.get(&id) else {
132 return Vec::new();
133 };
134 self.inner
135 .edges_directed(idx, Direction::Outgoing)
136 .map(|e| (self.inner[e.target()].id(), e.weight().clone()))
137 .collect()
138 }
139
140 #[must_use]
142 pub fn room_for_area(&self, area_id: &str) -> Option<WorldId> {
143 self.inner.node_weights().find_map(|n| match n {
144 WorldNode::Room { id, area_id: Some(a), .. } if a == area_id => Some(*id),
145 _ => None,
146 })
147 }
148
149 #[must_use]
153 pub fn observed_by(&self, sensor: WorldId) -> Vec<WorldId> {
154 self.neighbors(sensor)
155 .into_iter()
156 .filter(|(_, e)| matches!(e, WorldEdge::Observes { .. }))
157 .map(|(id, _)| id)
158 .collect()
159 }
160
161 #[must_use]
163 pub fn contents_of(&self, container: WorldId) -> Vec<WorldId> {
164 let Some(&idx) = self.index.get(&container) else {
165 return Vec::new();
166 };
167 self.inner
168 .edges_directed(idx, Direction::Incoming)
169 .filter(|e| matches!(e.weight(), WorldEdge::LocatedIn { .. }))
170 .map(|e| self.inner[e.source()].id())
171 .collect()
172 }
173
174 pub fn add_semantic_state(
178 &mut self,
179 statement: String,
180 confidence: f32,
181 valid_from_unix_ms: i64,
182 provenance: SemanticProvenance,
183 evidence_sources: &[WorldId],
184 ) -> WorldId {
185 let evidence_handles = provenance.evidence.clone();
186 let id = self.upsert_node(WorldNode::SemanticState {
187 id: WorldId::UNASSIGNED,
188 statement,
189 confidence,
190 provenance,
191 valid_from_unix_ms,
192 });
193 for (src, handle) in evidence_sources.iter().zip(
194 evidence_handles
195 .iter()
196 .cloned()
197 .chain(std::iter::repeat(String::new())),
198 ) {
199 let _ = self.add_edge(id, *src, WorldEdge::DerivedFrom { evidence: handle });
200 }
201 id
202 }
203
204 pub fn add_contradiction(
210 &mut self,
211 a: WorldId,
212 b: WorldId,
213 magnitude: f32,
214 flag: String,
215 ) -> Result<(), WorldGraphError> {
216 self.add_edge(a, b, WorldEdge::Contradicts { magnitude, flag })
217 }
218
219 pub fn apply_privacy_mode<F>(&mut self, mode: &str, action: &str, policy: F) -> PrivacyRollup
226 where
227 F: Fn(&str, &str) -> bool,
228 {
229 let mut decisions: Vec<(WorldId, WorldId, bool)> = Vec::new();
231 for e in self.inner.edge_references() {
232 if matches!(e.weight(), WorldEdge::Observes { .. }) {
233 let sensor = &self.inner[e.source()];
234 let target = &self.inner[e.target()];
235 let allowed = policy(sensor.kind(), target.kind());
236 decisions.push((sensor.id(), target.id(), allowed));
237 }
238 }
239
240 let mut denied_pairs = Vec::new();
241 let mut suppressed = Vec::new();
242 let mut allowed_pairs = 0usize;
243 for (sensor, target, allowed) in &decisions {
244 let _ = self.add_edge(
245 *sensor,
246 *target,
247 WorldEdge::PrivacyLimitedBy {
248 mode: mode.to_string(),
249 action: action.to_string(),
250 allowed: *allowed,
251 },
252 );
253 if *allowed {
254 allowed_pairs += 1;
255 } else {
256 denied_pairs.push((*sensor, *target));
257 if !suppressed.contains(target) {
258 suppressed.push(*target);
259 }
260 }
261 }
262
263 PrivacyRollup {
264 mode: mode.to_string(),
265 suppressed_nodes: suppressed,
266 denied_pairs,
267 allowed_pairs,
268 }
269 }
270
271 #[must_use]
275 pub fn snapshot(&self) -> WorldGraphSnapshot {
276 let nodes: Vec<WorldNode> = self.inner.node_weights().cloned().collect();
277 let edges: Vec<(WorldId, WorldId, WorldEdge)> = self
278 .inner
279 .edge_references()
280 .map(|e| {
281 (
282 self.inner[e.source()].id(),
283 self.inner[e.target()].id(),
284 e.weight().clone(),
285 )
286 })
287 .collect();
288 WorldGraphSnapshot {
289 schema_version: self.schema_version,
290 registration: self.registration.clone(),
291 next_id: self.next_id,
292 nodes,
293 edges,
294 }
295 }
296
297 pub fn to_json(&self) -> Result<Vec<u8>, WorldGraphError> {
302 Ok(serde_json::to_vec(&self.snapshot())?)
303 }
304
305 pub fn from_json(bytes: &[u8]) -> Result<Self, WorldGraphError> {
310 let snap: WorldGraphSnapshot = serde_json::from_slice(bytes)?;
311 let mut g = Self::new(snap.registration);
312 g.schema_version = snap.schema_version;
313 for node in snap.nodes {
314 g.upsert_node(node);
315 }
316 for (from, to, edge) in snap.edges {
317 g.add_edge(from, to, edge)?;
318 }
319 g.next_id = snap.next_id;
320 Ok(g)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use crate::model::{EnuPoint, SensorModality, WorldEdge, ZoneBoundsEnu};
328
329 fn enu(e: f64, n: f64) -> EnuPoint {
330 EnuPoint { east_m: e, north_m: n, up_m: 0.0 }
331 }
332
333 fn living_room() -> WorldNode {
334 WorldNode::Room {
335 id: WorldId::UNASSIGNED,
336 area_id: Some("living_room".into()),
337 name: "Living Room".into(),
338 bounds_enu: ZoneBoundsEnu::Rectangle { min_e: 0.0, min_n: 0.0, max_e: 5.0, max_n: 4.0 },
339 floor: 0,
340 }
341 }
342
343 #[test]
344 fn upsert_allocates_and_replaces() {
345 let mut g = WorldGraph::new(GeoRegistration::default());
346 let id = g.upsert_node(living_room());
347 assert!(!id.is_unassigned());
348 assert_eq!(g.node_count(), 1);
349 g.upsert_node(WorldNode::Room {
351 id,
352 area_id: Some("living_room".into()),
353 name: "Lounge".into(),
354 bounds_enu: ZoneBoundsEnu::Rectangle { min_e: 0.0, min_n: 0.0, max_e: 5.0, max_n: 4.0 },
355 floor: 0,
356 });
357 assert_eq!(g.node_count(), 1);
358 assert!(matches!(g.node(id), Some(WorldNode::Room { name, .. }) if name == "Lounge"));
359 }
360
361 #[test]
362 fn area_linkage_and_observability() {
363 let mut g = WorldGraph::new(GeoRegistration::default());
364 let room = g.upsert_node(living_room());
365 let sensor = g.upsert_node(WorldNode::Sensor {
366 id: WorldId::UNASSIGNED,
367 device_id: "esp32-com9".into(),
368 position: enu(1.0, 1.0),
369 modality: SensorModality::WifiCsi,
370 });
371 g.add_edge(sensor, room, WorldEdge::Observes { quality: 0.9, last_seen_unix_ms: 1 })
372 .unwrap();
373
374 assert_eq!(g.room_for_area("living_room"), Some(room));
375 assert_eq!(g.observed_by(sensor), vec![room]);
376 }
377
378 #[test]
379 fn add_edge_unknown_endpoint_errors() {
380 let mut g = WorldGraph::new(GeoRegistration::default());
381 let room = g.upsert_node(living_room());
382 let err = g.add_edge(room, WorldId(999), WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 });
383 assert!(matches!(err, Err(WorldGraphError::UnknownNode(WorldId(999)))));
384 }
385
386 #[test]
387 fn location_query_contents_of() {
388 let mut g = WorldGraph::new(GeoRegistration::default());
389 let room = g.upsert_node(living_room());
390 let person = g.upsert_node(WorldNode::PersonTrack {
391 id: WorldId::UNASSIGNED,
392 track_id: 7,
393 last_position: enu(2.0, 2.0),
394 reid_embedding_ref: None,
395 });
396 g.add_edge(person, room, WorldEdge::LocatedIn { since_unix_ms: 100 }).unwrap();
397 assert_eq!(g.contents_of(room), vec![person]);
398 }
399
400 #[test]
401 fn semantic_state_provenance_and_contradiction() {
402 let mut g = WorldGraph::new(GeoRegistration::default());
403 let event = g.upsert_node(WorldNode::Event {
404 id: WorldId::UNASSIGNED,
405 event_type: "motion".into(),
406 at_unix_ms: 10,
407 located_in: None,
408 });
409 let prov = SemanticProvenance {
410 evidence: vec!["ev:abc".into()],
411 model_version: "rfenc-1.0".into(),
412 calibration_version: "cal:uuid".into(),
413 privacy_decision: "PrivateHome/Allow".into(),
414 };
415 let s1 = g.add_semantic_state("present".into(), 0.9, 11, prov.clone(), &[event]);
416 assert!(g.neighbors(s1).iter().any(|(to, e)| *to == event
418 && matches!(e, WorldEdge::DerivedFrom { .. })));
419
420 let s2 = g.add_semantic_state("absent".into(), 0.6, 12, prov, &[event]);
421 g.add_contradiction(s1, s2, 0.3, "flag:ts".into()).unwrap();
422 assert!(g.node(s1).is_some() && g.node(s2).is_some());
424 assert!(g.neighbors(s1).iter().any(|(_, e)| matches!(e, WorldEdge::Contradicts { .. })));
425 }
426
427 #[test]
428 fn privacy_rollup_suppresses_person_tracks() {
429 let mut g = WorldGraph::new(GeoRegistration::default());
430 let room = g.upsert_node(living_room());
431 let person = g.upsert_node(WorldNode::PersonTrack {
432 id: WorldId::UNASSIGNED,
433 track_id: 1,
434 last_position: enu(1.0, 1.0),
435 reid_embedding_ref: None,
436 });
437 let sensor = g.upsert_node(WorldNode::Sensor {
438 id: WorldId::UNASSIGNED,
439 device_id: "s".into(),
440 position: enu(0.0, 0.0),
441 modality: SensorModality::WifiCsi,
442 });
443 g.add_edge(sensor, room, WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 }).unwrap();
444 g.add_edge(sensor, person, WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 }).unwrap();
445
446 let rollup = g.apply_privacy_mode("StrictNoIdentity", "SuppressIdentity", |_modality, node_kind| {
448 node_kind != "person_track"
449 });
450 assert_eq!(rollup.allowed_pairs, 1);
451 assert_eq!(rollup.denied_pairs, vec![(sensor, person)]);
452 assert_eq!(rollup.suppressed_nodes, vec![person]);
453 }
454
455 #[test]
456 fn json_roundtrip_preserves_nodes_and_edges() {
457 let mut g = WorldGraph::new(GeoRegistration::default());
458 let room = g.upsert_node(living_room());
459 let sensor = g.upsert_node(WorldNode::Sensor {
460 id: WorldId::UNASSIGNED,
461 device_id: "s".into(),
462 position: enu(0.0, 0.0),
463 modality: SensorModality::WifiCsi,
464 });
465 g.add_edge(sensor, room, WorldEdge::Observes { quality: 0.8, last_seen_unix_ms: 5 }).unwrap();
466
467 let bytes = g.to_json().unwrap();
468 let g2 = WorldGraph::from_json(&bytes).unwrap();
469 assert_eq!(g2.node_count(), 2);
470 assert_eq!(g2.room_for_area("living_room"), Some(room));
471 assert_eq!(g2.observed_by(sensor), vec![room]);
472 assert_eq!(g2.to_json().unwrap(), bytes);
474 }
475}