1#![cfg_attr(not(feature = "std"), no_std)]
30#![deny(unsafe_code)]
31#![warn(missing_docs)]
32
33extern crate alloc;
34
35use alloc::collections::{BTreeMap, VecDeque};
36use alloc::vec::Vec;
37
38use alloc::string::String;
39
40use zerodds_xrce::header::{CLIENT_KEY_LEN, ClientKey};
41use zerodds_xrce::object_id::ObjectId;
42use zerodds_xrce::object_repr::ObjectVariant;
43use zerodds_xrce::object_store::{CreateOutcome, CreationMode, ObjectStore};
44
45type ClientKeyOrd = [u8; CLIENT_KEY_LEN];
49
50fn ord_of(key: ClientKey) -> ClientKeyOrd {
51 key.0
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AgentError {
57 UnknownClient,
59 UnknownReader,
61 QueueFull,
63 WireRejected,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct TraceEvent {
75 pub operation: String,
77 pub client_key: ClientKeyOrd,
79 pub object_id: [u8; 2],
81}
82
83pub trait TraceSink {
86 fn record(&mut self, event: TraceEvent);
88}
89
90pub struct XrceAgent {
92 clients: BTreeMap<ClientKeyOrd, ObjectStore>,
94 samples: BTreeMap<(ClientKeyOrd, [u8; 2]), VecDeque<Vec<u8>>>,
96 max_pending_samples: usize,
98 trace_sink: Option<alloc::boxed::Box<dyn TraceSink + Send>>,
100}
101
102fn oid_key(oid: ObjectId) -> [u8; 2] {
103 oid.to_bytes()
104}
105
106impl XrceAgent {
107 #[must_use]
109 pub fn new() -> Self {
110 Self::with_max_pending_samples(256)
111 }
112
113 #[must_use]
115 pub fn with_max_pending_samples(max: usize) -> Self {
116 Self {
117 clients: BTreeMap::new(),
118 samples: BTreeMap::new(),
119 max_pending_samples: max,
120 trace_sink: None,
121 }
122 }
123
124 pub fn set_trace_sink(&mut self, sink: alloc::boxed::Box<dyn TraceSink + Send>) {
128 self.trace_sink = Some(sink);
129 }
130
131 fn trace(&mut self, op: &str, client_key: ClientKeyOrd, oid: [u8; 2]) {
132 if let Some(sink) = self.trace_sink.as_mut() {
133 sink.record(TraceEvent {
134 operation: String::from(op),
135 client_key,
136 object_id: oid,
137 });
138 }
139 }
140
141 pub fn register_client(&mut self, client_key: ClientKey) {
145 self.clients.entry(ord_of(client_key)).or_default();
146 }
147
148 #[must_use]
150 pub fn has_client(&self, client_key: ClientKey) -> bool {
151 self.clients.contains_key(&ord_of(client_key))
152 }
153
154 #[must_use]
156 pub fn client_count(&self) -> usize {
157 self.clients.len()
158 }
159
160 pub fn create_object(
165 &mut self,
166 client_key: ClientKey,
167 object_id: ObjectId,
168 representation: ObjectVariant,
169 mode: CreationMode,
170 ) -> Result<CreateOutcome, AgentError> {
171 let ord = ord_of(client_key);
172 let store = self
173 .clients
174 .get_mut(&ord)
175 .ok_or(AgentError::UnknownClient)?;
176 let kind = object_id.kind().map_err(|_| AgentError::WireRejected)?;
177 let outcome = store
178 .create(object_id, kind, representation, mode)
179 .map_err(|_| AgentError::WireRejected)?;
180 self.trace("CREATE", ord, oid_key(object_id));
181 Ok(outcome)
182 }
183
184 pub fn delete_object(
190 &mut self,
191 client_key: ClientKey,
192 object_id: ObjectId,
193 ) -> Result<bool, AgentError> {
194 let ord = ord_of(client_key);
195 let store = self
196 .clients
197 .get_mut(&ord)
198 .ok_or(AgentError::UnknownClient)?;
199 let removed = store.delete(object_id);
200 self.samples.remove(&(ord, oid_key(object_id)));
202 self.trace("DELETE", ord, oid_key(object_id));
203 Ok(removed)
204 }
205
206 pub fn submit_sample(
212 &mut self,
213 client_key: ClientKey,
214 reader_id: ObjectId,
215 payload: Vec<u8>,
216 ) -> Result<(), AgentError> {
217 let ord = ord_of(client_key);
218 let store = self.clients.get(&ord).ok_or(AgentError::UnknownClient)?;
219 if store.get(reader_id).is_none() {
220 return Err(AgentError::UnknownReader);
221 }
222 let queue = self.samples.entry((ord, oid_key(reader_id))).or_default();
223 if queue.len() >= self.max_pending_samples {
224 return Err(AgentError::QueueFull);
225 }
226 queue.push_back(payload);
227 self.trace("SUBMIT", ord, oid_key(reader_id));
228 Ok(())
229 }
230
231 pub fn pull_sample(
238 &mut self,
239 client_key: ClientKey,
240 reader_id: ObjectId,
241 ) -> Result<Option<Vec<u8>>, AgentError> {
242 let ord = ord_of(client_key);
243 if !self.clients.contains_key(&ord) {
244 return Err(AgentError::UnknownClient);
245 }
246 let sample = self
247 .samples
248 .get_mut(&(ord, oid_key(reader_id)))
249 .and_then(VecDeque::pop_front);
250 if sample.is_some() {
251 self.trace("PULL", ord, oid_key(reader_id));
252 }
253 Ok(sample)
254 }
255
256 #[must_use]
258 pub fn pending_samples(&self, client_key: ClientKey, reader_id: ObjectId) -> usize {
259 self.samples
260 .get(&(ord_of(client_key), oid_key(reader_id)))
261 .map_or(0, VecDeque::len)
262 }
263}
264
265impl Default for XrceAgent {
266 fn default() -> Self {
267 Self::new()
268 }
269}
270
271#[cfg(test)]
272#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
273mod tests {
274 use super::*;
275 use zerodds_xrce::header::CLIENT_KEY_LEN;
276 use zerodds_xrce::object_kind::{OBJK_DATAREADER, ObjectKind};
277
278 fn key(b: u8) -> ClientKey {
279 ClientKey([b; CLIENT_KEY_LEN])
280 }
281
282 fn reader_id(raw: u16) -> ObjectId {
283 ObjectId::new(raw, ObjectKind::from_u8(OBJK_DATAREADER).unwrap()).unwrap()
284 }
285
286 #[test]
287 fn agent_starts_empty() {
288 let a = XrceAgent::new();
289 assert_eq!(a.client_count(), 0);
290 }
291
292 #[test]
293 fn register_client_idempotent() {
294 let mut a = XrceAgent::new();
295 let k = key(0x01);
296 a.register_client(k);
297 a.register_client(k);
298 assert!(a.has_client(k));
299 assert_eq!(a.client_count(), 1);
300 }
301
302 #[test]
303 fn create_object_for_unknown_client_rejected() {
304 let mut a = XrceAgent::new();
305 let oid = reader_id(0x010);
306 let err = a
307 .create_object(
308 key(0x99),
309 oid,
310 ObjectVariant::ByReference("r".into()),
311 CreationMode::default(),
312 )
313 .expect_err("unknown client");
314 assert_eq!(err, AgentError::UnknownClient);
315 }
316
317 #[test]
318 fn pull_sample_unknown_client_rejected() {
319 let mut a = XrceAgent::new();
320 let oid = reader_id(0x010);
321 let err = a.pull_sample(key(0x99), oid).expect_err("unknown");
322 assert_eq!(err, AgentError::UnknownClient);
323 }
324
325 #[test]
326 fn client_pull_empty_returns_none() {
327 let mut a = XrceAgent::new();
328 let k = key(0x01);
329 a.register_client(k);
330 let oid = reader_id(0x010);
331 let s = a.pull_sample(k, oid).expect("ok");
332 assert!(s.is_none());
333 }
334
335 #[test]
336 fn after_submit_pull_returns_sample_in_fifo_order() {
337 let mut a = XrceAgent::new();
338 let k = key(0x01);
339 a.register_client(k);
340 let reader = reader_id(0x010);
341 a.create_object(
342 k,
343 reader,
344 ObjectVariant::ByReference("R".into()),
345 CreationMode::default(),
346 )
347 .expect("create");
348 a.submit_sample(k, reader, alloc::vec![1, 2, 3])
349 .expect("s1");
350 a.submit_sample(k, reader, alloc::vec![4, 5, 6])
351 .expect("s2");
352 assert_eq!(a.pending_samples(k, reader), 2);
353 let p1 = a.pull_sample(k, reader).expect("ok1").expect("some1");
354 assert_eq!(p1, alloc::vec![1, 2, 3]);
355 let p2 = a.pull_sample(k, reader).expect("ok2").expect("some2");
356 assert_eq!(p2, alloc::vec![4, 5, 6]);
357 assert!(a.pull_sample(k, reader).expect("ok3").is_none());
358 }
359
360 #[test]
361 fn submit_to_unknown_reader_rejected() {
362 let mut a = XrceAgent::new();
363 let k = key(0x01);
364 a.register_client(k);
365 let reader = reader_id(0x010);
366 let err = a
367 .submit_sample(k, reader, alloc::vec![0])
368 .expect_err("unknown reader");
369 assert_eq!(err, AgentError::UnknownReader);
370 }
371
372 #[test]
373 fn dos_cap_max_pending_samples_enforced() {
374 let mut a = XrceAgent::with_max_pending_samples(2);
375 let k = key(0x01);
376 a.register_client(k);
377 let reader = reader_id(0x010);
378 a.create_object(
379 k,
380 reader,
381 ObjectVariant::ByReference("R".into()),
382 CreationMode::default(),
383 )
384 .expect("create");
385 a.submit_sample(k, reader, alloc::vec![1]).expect("s1");
386 a.submit_sample(k, reader, alloc::vec![2]).expect("s2");
387 let err = a
388 .submit_sample(k, reader, alloc::vec![3])
389 .expect_err("full");
390 assert_eq!(err, AgentError::QueueFull);
391 }
392
393 #[test]
394 fn delete_object_removes_pull_queue() {
395 let mut a = XrceAgent::new();
396 let k = key(0x01);
397 a.register_client(k);
398 let reader = reader_id(0x010);
399 a.create_object(
400 k,
401 reader,
402 ObjectVariant::ByReference("R".into()),
403 CreationMode::default(),
404 )
405 .expect("create");
406 a.submit_sample(k, reader, alloc::vec![1]).expect("s1");
407 assert_eq!(a.pending_samples(k, reader), 1);
408 let removed = a.delete_object(k, reader).expect("delete");
409 assert!(removed);
410 assert_eq!(a.pending_samples(k, reader), 0);
411 }
412
413 #[test]
415 fn trace_sink_captures_create_delete_submit_pull() {
416 use alloc::sync::Arc;
417 use std::sync::Mutex;
418
419 struct CaptureSink(Arc<Mutex<Vec<TraceEvent>>>);
420 impl TraceSink for CaptureSink {
421 fn record(&mut self, event: TraceEvent) {
422 self.0.lock().unwrap().push(event);
423 }
424 }
425 let log: Arc<Mutex<Vec<TraceEvent>>> = Arc::new(Mutex::new(Vec::new()));
426 let mut a = XrceAgent::new();
427 a.set_trace_sink(alloc::boxed::Box::new(CaptureSink(Arc::clone(&log))));
428 let k = key(0x01);
429 a.register_client(k);
430 let reader = reader_id(0x010);
431 a.create_object(
432 k,
433 reader,
434 ObjectVariant::ByReference("R".into()),
435 CreationMode::default(),
436 )
437 .expect("create");
438 a.submit_sample(k, reader, alloc::vec![1]).expect("submit");
439 a.pull_sample(k, reader).expect("pull");
440 a.delete_object(k, reader).expect("delete");
441
442 let events = log.lock().unwrap();
443 let ops: Vec<&str> = events.iter().map(|e| e.operation.as_str()).collect();
444 assert_eq!(ops, vec!["CREATE", "SUBMIT", "PULL", "DELETE"]);
445 for e in events.iter() {
446 assert_eq!(e.client_key, k.0);
447 assert_eq!(e.object_id, reader.to_bytes());
448 }
449 }
450
451 #[test]
453 fn create_application_object_via_objk_application() {
454 use zerodds_xrce::object_kind::{OBJK_APPLICATION, ObjectKind};
455 let mut a = XrceAgent::new();
456 let k = key(0x01);
457 a.register_client(k);
458 let app_oid = ObjectId::new(0x100, ObjectKind::from_u8(OBJK_APPLICATION).unwrap()).unwrap();
459 let outcome = a
460 .create_object(
461 k,
462 app_oid,
463 ObjectVariant::ByXmlString("<application name=\"App1\"/>".into()),
464 CreationMode::default(),
465 )
466 .expect("create app");
467 assert_eq!(outcome, CreateOutcome::Created);
468 }
469
470 #[test]
475 fn agent_create_delete_latency_under_spec_floor() {
476 use std::time::Instant;
477 use zerodds_xrce::object_kind::{OBJK_PARTICIPANT, ObjectKind};
478
479 let mut a = XrceAgent::new();
480 let k = key(0x01);
481 a.register_client(k);
482 let kind = ObjectKind::from_u8(OBJK_PARTICIPANT).unwrap();
483 let start = Instant::now();
484 for i in 0..1000u16 {
485 let oid = ObjectId::new(i, kind).unwrap();
486 a.create_object(
487 k,
488 oid,
489 ObjectVariant::ByReference("p".into()),
490 CreationMode::default(),
491 )
492 .expect("create");
493 a.delete_object(k, oid).expect("delete");
494 }
495 let elapsed = start.elapsed();
496 assert!(
497 elapsed.as_millis() < 100,
498 "1000 CREATE+DELETE-Ops dauerten {} ms (Spec-Floor 100ms)",
499 elapsed.as_millis()
500 );
501 }
502
503 #[test]
504 fn multiple_clients_isolated() {
505 let mut a = XrceAgent::new();
506 let k1 = key(0x01);
507 let k2 = key(0x02);
508 a.register_client(k1);
509 a.register_client(k2);
510 let reader = reader_id(0x010);
511 a.create_object(
512 k1,
513 reader,
514 ObjectVariant::ByReference("R".into()),
515 CreationMode::default(),
516 )
517 .expect("c1");
518 a.create_object(
519 k2,
520 reader,
521 ObjectVariant::ByReference("R".into()),
522 CreationMode::default(),
523 )
524 .expect("c2");
525 a.submit_sample(k1, reader, alloc::vec![100]).expect("s1");
526 assert_eq!(a.pending_samples(k2, reader), 0);
528 assert_eq!(a.pending_samples(k1, reader), 1);
529 }
530}