Skip to main content

zerodds_dcps/
condition.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `Condition`-Hierarchie + `WaitSet` (DDS DCPS 1.4 §2.2.2.1.6).
4//!
5//! Conditions sind die DDS-Variante von "future-readyness": ein
6//! `Condition`-Objekt traegt einen `trigger_value()`-Boolean, der
7//! `true` wird, wenn ein bestimmtes Ereignis eintritt. Mehrere
8//! Conditions werden in einem [`WaitSet`] gesammelt, der dann
9//! mit [`WaitSet::wait`] blockiert, bis irgendeine Condition triggert
10//! (oder ein Timeout erreicht ist).
11//!
12//! # Spec-Hierarchie
13//!
14//! - `Condition` (Base, nur trigger_value)
15//!   - `StatusCondition` (siehe [`crate::entity::StatusCondition`])
16//!   - `GuardCondition` (manuell setzbar, hier definiert)
17//!   - `ReadCondition` (basiert auf SampleInfo-State, hier definiert)
18//!   - `QueryCondition` (ReadCondition + SQL-Filter, hier definiert)
19//!
20//! Der Trait ist object-safe; WaitSet haelt `Arc<dyn Condition>`.
21
22extern crate alloc;
23
24use alloc::sync::Arc;
25use alloc::vec::Vec;
26use core::sync::atomic::{AtomicBool, Ordering};
27use core::time::Duration;
28
29#[cfg(feature = "std")]
30use std::sync::{Condvar, Mutex};
31
32use crate::error::{DdsError, Result};
33
34/// Condition-Trait — Spec §2.2.2.1.6 Base-Class.
35pub trait Condition: Send + Sync {
36    /// True wenn das Ereignis dieser Condition aktuell ansteht.
37    /// Spec §2.2.2.1.6 `get_trigger_value`.
38    fn get_trigger_value(&self) -> bool;
39}
40
41// Re-export der StatusCondition aus entity.rs damit Caller nur condition::*
42// importieren muessen.
43pub use crate::entity::StatusCondition;
44
45impl Condition for StatusCondition {
46    fn get_trigger_value(&self) -> bool {
47        self.trigger_value()
48    }
49}
50
51/// `ReadCondition` — Spec §2.2.2.5.8 / §2.2.4.5 Trigger State.
52///
53/// Eine ReadCondition ist an einen DataReader gebunden und triggert
54/// `true`, wenn der Reader Samples enthaelt, die in alle drei Masks
55/// (sample_state_mask, view_state_mask, instance_state_mask) passen.
56///
57/// Design: Die Trigger-Logik selbst (das eigentliche
58/// "hat der Reader Samples mit diesen Masks?"-Query) ist vom Caller
59/// als Closure injiziert, weil der DataReader-Sample-Cache nicht
60/// objekt-safe gequeried werden kann ohne weitere Infrastructure-
61/// Aenderungen. Der DCPS-API-Konsument (idR DataReader::create_readcondition)
62/// liefert die Closure in Form `(sm, vm, im) -> bool`.
63pub struct ReadCondition {
64    sample_state_mask: u32,
65    view_state_mask: u32,
66    instance_state_mask: u32,
67    trigger: Arc<dyn Fn(u32, u32, u32) -> bool + Send + Sync>,
68}
69
70impl core::fmt::Debug for ReadCondition {
71    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72        f.debug_struct("ReadCondition")
73            .field("sample_state_mask", &self.sample_state_mask)
74            .field("view_state_mask", &self.view_state_mask)
75            .field("instance_state_mask", &self.instance_state_mask)
76            .finish_non_exhaustive()
77    }
78}
79
80impl ReadCondition {
81    /// Konstruktor mit Trigger-Closure.
82    #[must_use]
83    pub fn new<F>(
84        sample_state_mask: u32,
85        view_state_mask: u32,
86        instance_state_mask: u32,
87        trigger: F,
88    ) -> Arc<Self>
89    where
90        F: Fn(u32, u32, u32) -> bool + Send + Sync + 'static,
91    {
92        Arc::new(Self {
93            sample_state_mask,
94            view_state_mask,
95            instance_state_mask,
96            trigger: Arc::new(trigger),
97        })
98    }
99
100    /// Spec §2.2.2.5.8 `get_sample_state_mask`.
101    #[must_use]
102    pub fn get_sample_state_mask(&self) -> u32 {
103        self.sample_state_mask
104    }
105
106    /// Spec §2.2.2.5.8 `get_view_state_mask`.
107    #[must_use]
108    pub fn get_view_state_mask(&self) -> u32 {
109        self.view_state_mask
110    }
111
112    /// Spec §2.2.2.5.8 `get_instance_state_mask`.
113    #[must_use]
114    pub fn get_instance_state_mask(&self) -> u32 {
115        self.instance_state_mask
116    }
117}
118
119impl Condition for ReadCondition {
120    fn get_trigger_value(&self) -> bool {
121        (self.trigger)(
122            self.sample_state_mask,
123            self.view_state_mask,
124            self.instance_state_mask,
125        )
126    }
127}
128
129/// `QueryCondition` — Spec §2.2.2.5.9. Erweitert ReadCondition um
130/// einen SQL-Filter-Ausdruck (DDS-DCPS Annex B). Der Filter wird
131/// pro Sample evaluiert (siehe [`Self::evaluate`]); der parse-Schritt
132/// passiert einmalig im Konstruktor.
133pub struct QueryCondition {
134    base: Arc<ReadCondition>,
135    query_expression: alloc::string::String,
136    query_parameters: Mutex<Vec<alloc::string::String>>,
137    parsed: zerodds_sql_filter::Expr,
138}
139
140impl core::fmt::Debug for QueryCondition {
141    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
142        f.debug_struct("QueryCondition")
143            .field("query_expression", &self.query_expression)
144            .field("base", &self.base)
145            .finish_non_exhaustive()
146    }
147}
148
149impl QueryCondition {
150    /// Konstruktor — Spec §2.2.2.5.2.5 `create_querycondition` /
151    /// §2.2.2.5.9. Der SQL-Ausdruck wird sofort geparst; eine syntaktisch
152    /// ungueltige Expression liefert `BadParameter`.
153    ///
154    /// # Errors
155    /// `BadParameter` wenn der SQL-Ausdruck nicht parst.
156    pub fn new(
157        base: Arc<ReadCondition>,
158        query_expression: impl Into<alloc::string::String>,
159        query_parameters: Vec<alloc::string::String>,
160    ) -> Result<Arc<Self>> {
161        let expr_str = query_expression.into();
162        let parsed = zerodds_sql_filter::parse(&expr_str).map_err(|_| DdsError::BadParameter {
163            what: "QueryCondition: invalid SQL filter expression",
164        })?;
165        Ok(Arc::new(Self {
166            base,
167            query_expression: expr_str,
168            query_parameters: Mutex::new(query_parameters),
169            parsed,
170        }))
171    }
172
173    /// Evaluiert den SQL-Filter gegen ein Sample. Spec §2.2.2.5.9.6 —
174    /// nur Samples mit `evaluate(...)==Ok(true)` zaehlen fuer das
175    /// `read_w_condition`/`take_w_condition`-Resultat.
176    ///
177    /// Parameter-Strings werden in `String`-`Value`s konvertiert; der
178    /// Caller kann typed Parameter ueber [`Self::evaluate_with_values`]
179    /// uebergeben.
180    ///
181    /// # Errors
182    /// Lock-Poisoning oder SQL-Eval-Error (UnknownField/TypeMismatch/
183    /// MissingParam).
184    pub fn evaluate<R: zerodds_sql_filter::RowAccess>(&self, row: &R) -> Result<bool> {
185        let params = self
186            .query_parameters
187            .lock()
188            .map_err(|_| DdsError::PreconditionNotMet {
189                reason: "query parameters poisoned",
190            })?;
191        let values: Vec<zerodds_sql_filter::Value> = params
192            .iter()
193            .map(|s| zerodds_sql_filter::Value::String(s.clone()))
194            .collect();
195        self.parsed
196            .evaluate(row, &values)
197            .map_err(|_| DdsError::PreconditionNotMet {
198                reason: "QueryCondition SQL evaluation failed",
199            })
200    }
201
202    /// Wie [`Self::evaluate`], aber mit explizit typed Parameter-Slice
203    /// (z.B. fuer Int/Float-Parameter, die als String-Cast nicht
204    /// matchen wuerden).
205    ///
206    /// # Errors
207    /// SQL-Eval-Error.
208    pub fn evaluate_with_values<R: zerodds_sql_filter::RowAccess>(
209        &self,
210        row: &R,
211        params: &[zerodds_sql_filter::Value],
212    ) -> Result<bool> {
213        self.parsed
214            .evaluate(row, params)
215            .map_err(|_| DdsError::PreconditionNotMet {
216                reason: "QueryCondition SQL evaluation failed",
217            })
218    }
219
220    /// Spec §2.2.2.5.9.4 `get_query_expression`.
221    #[must_use]
222    pub fn get_query_expression(&self) -> &str {
223        &self.query_expression
224    }
225
226    /// Spec §2.2.2.5.9.5 `get_query_parameters`.
227    #[must_use]
228    pub fn get_query_parameters(&self) -> Vec<alloc::string::String> {
229        self.query_parameters
230            .lock()
231            .map(|p| p.clone())
232            .unwrap_or_default()
233    }
234
235    /// Spec §2.2.2.5.9.6 `set_query_parameters`.
236    ///
237    /// # Errors
238    /// `PreconditionNotMet` bei Lock-Poisoning.
239    pub fn set_query_parameters(&self, params: Vec<alloc::string::String>) -> Result<()> {
240        let mut current =
241            self.query_parameters
242                .lock()
243                .map_err(|_| DdsError::PreconditionNotMet {
244                    reason: "query parameters poisoned",
245                })?;
246        *current = params;
247        Ok(())
248    }
249
250    /// Zugriff auf die Base-ReadCondition (Spec: QueryCondition
251    /// extends ReadCondition).
252    #[must_use]
253    pub fn base(&self) -> &Arc<ReadCondition> {
254        &self.base
255    }
256}
257
258impl Condition for QueryCondition {
259    fn get_trigger_value(&self) -> bool {
260        // Trigger erbt von der Base-ReadCondition (State-Mask-Match);
261        // die per-Sample-Filter-Evaluierung passiert in DataReader::
262        // read_w_condition/take_w_condition via [`Self::evaluate`].
263        self.base.get_trigger_value()
264    }
265}
266
267/// `GuardCondition` — vom User manuell triggerbar (Spec §2.2.2.1.7).
268///
269/// Typische Verwendung: Application-Thread setzt `set_trigger_value(true)`,
270/// um einen blockierten WaitSet aufzuwecken (z.B. fuer Shutdown-Signaling).
271#[derive(Debug, Default)]
272pub struct GuardCondition {
273    triggered: AtomicBool,
274}
275
276impl GuardCondition {
277    /// Neuer GuardCondition, initial `trigger_value=false`.
278    #[must_use]
279    pub fn new() -> Arc<Self> {
280        Arc::new(Self::default())
281    }
282
283    /// Setzt den Trigger-Wert. Spec §2.2.2.1.7 `set_trigger_value`.
284    pub fn set_trigger_value(&self, value: bool) {
285        self.triggered.store(value, Ordering::Release);
286    }
287}
288
289impl Condition for GuardCondition {
290    fn get_trigger_value(&self) -> bool {
291        self.triggered.load(Ordering::Acquire)
292    }
293}
294
295/// `WaitSet` — Spec §2.2.2.1.6.
296///
297/// Ein WaitSet sammelt 0..N `Arc<dyn Condition>` und blockiert in
298/// [`Self::wait`] bis mindestens eine triggert oder ein Timeout
299/// erreicht wird. Liefert die Liste der **getriggerten** Conditions
300/// zurueck.
301#[cfg(feature = "std")]
302pub struct WaitSet {
303    inner: Arc<WaitSetInner>,
304}
305
306#[cfg(feature = "std")]
307struct WaitSetInner {
308    conditions: Mutex<Vec<Arc<dyn Condition>>>,
309    /// Notify-Pair fuer poll-based wait. Conditions, die ihren
310    /// trigger_value setzen, koennen via `notify()` den WaitSet
311    /// aufwecken — im aktuellen Stand pollen wir aber nur, weil
312    /// wir die Conditions nicht zwingen wollen, einen Backref zu halten.
313    cvar: Condvar,
314    /// Idle-Sleep zwischen Poll-Ticks (1ms — kurz genug fuer Latenz,
315    /// lang genug fuer geringe CPU-Last).
316    poll_interval: core::time::Duration,
317    /// Mutex-Guard fuer Condvar.
318    locked: Mutex<()>,
319    /// `true` solange ein Thread in `wait()` haengt. Spec §2.2.2.1.6:
320    /// "WaitSet MAY be used by only one application thread at a time"
321    /// — konkurrente `wait()`-Aufrufe MUESSEN `PreconditionNotMet`
322    /// liefern.
323    waiting: AtomicBool,
324}
325
326/// DoS-Cap fuer Conditions in einem einzelnen WaitSet
327/// (Spec §2.2.2.1.6: kein expliziter Cap, aber RESOURCE_LIMITS-
328/// Pendant). Schuetzt vor unbegrenztem Aufbau bei Pathologien.
329pub const MAX_WAITSET_CONDITIONS: usize = 1024;
330
331#[cfg(feature = "std")]
332impl WaitSet {
333    /// Neuer leerer WaitSet.
334    #[must_use]
335    pub fn new() -> Self {
336        Self {
337            inner: Arc::new(WaitSetInner {
338                conditions: Mutex::new(Vec::new()),
339                cvar: Condvar::new(),
340                poll_interval: Duration::from_millis(1),
341                locked: Mutex::new(()),
342                waiting: AtomicBool::new(false),
343            }),
344        }
345    }
346
347    /// Haengt eine Condition an. Spec §2.2.2.1.6 `attach_condition`.
348    ///
349    /// # Errors
350    /// `PreconditionNotMet` wenn der interne Lock vergiftet ist.
351    pub fn attach_condition(&self, cond: Arc<dyn Condition>) -> Result<()> {
352        let mut conds = self
353            .inner
354            .conditions
355            .lock()
356            .map_err(|_| DdsError::PreconditionNotMet {
357                reason: "waitset conditions poisoned",
358            })?;
359        // Idempotent: gleiche Arc-Identitaet wird nicht doppelt
360        // hinzugefuegt.
361        if conds.iter().any(|c| Arc::ptr_eq(c, &cond)) {
362            return Ok(());
363        }
364        // Spec §2.2.2.1.6 / §2.2.1.1.6 — DoS-Cap.
365        if conds.len() >= MAX_WAITSET_CONDITIONS {
366            return Err(DdsError::OutOfResources {
367                what: "waitset condition count",
368            });
369        }
370        conds.push(cond);
371        Ok(())
372    }
373
374    /// Loest eine Condition. Spec §2.2.2.1.6 `detach_condition`.
375    ///
376    /// # Errors
377    /// `PreconditionNotMet` wenn der interne Lock vergiftet ist.
378    pub fn detach_condition(&self, cond: &Arc<dyn Condition>) -> Result<()> {
379        let mut conds = self
380            .inner
381            .conditions
382            .lock()
383            .map_err(|_| DdsError::PreconditionNotMet {
384                reason: "waitset conditions poisoned",
385            })?;
386        conds.retain(|c| !Arc::ptr_eq(c, cond));
387        Ok(())
388    }
389
390    /// Aktuelle Anzahl der angehaengten Conditions.
391    ///
392    /// # Errors
393    /// `PreconditionNotMet` bei Lock-Poisoning.
394    pub fn get_conditions(&self) -> Result<Vec<Arc<dyn Condition>>> {
395        let conds = self
396            .inner
397            .conditions
398            .lock()
399            .map_err(|_| DdsError::PreconditionNotMet {
400                reason: "waitset conditions poisoned",
401            })?;
402        Ok(conds.clone())
403    }
404
405    /// Blockiert bis mindestens eine Condition triggert oder das
406    /// Timeout abgelaufen ist. Liefert die Liste der getriggerten
407    /// Conditions. Spec §2.2.2.1.6 `wait`.
408    ///
409    /// **Implementation:** Polling mit 1ms Intervall. Production-grade
410    /// WaitSet wuerde Conditions Notify-Hooks anbieten; das ist Folge-
411    /// Optimierung in .
412    ///
413    /// # Errors
414    /// * [`DdsError::Timeout`] wenn keine Condition innerhalb des Timeouts
415    ///   triggert.
416    /// * [`DdsError::PreconditionNotMet`] bei Lock-Poisoning.
417    pub fn wait(&self, timeout: Duration) -> Result<Vec<Arc<dyn Condition>>> {
418        // Spec §2.2.2.1.6 — Single-Thread-Wait. Atomic compare_exchange
419        // verhindert dass zwei Threads gleichzeitig `wait()` betreten.
420        // RAII-Guard via lokales Struct, das beim Drop den Flag
421        // zuruecksetzt — robuster gegen Panics in poll_active.
422        if self
423            .inner
424            .waiting
425            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
426            .is_err()
427        {
428            return Err(DdsError::PreconditionNotMet {
429                reason: "waitset already in wait() — single-thread-wait per spec",
430            });
431        }
432        struct WaitGuard<'a>(&'a AtomicBool);
433        impl Drop for WaitGuard<'_> {
434            fn drop(&mut self) {
435                self.0.store(false, Ordering::Release);
436            }
437        }
438        let _guard = WaitGuard(&self.inner.waiting);
439
440        let deadline = std::time::Instant::now() + timeout;
441        loop {
442            let active = self.poll_active()?;
443            if !active.is_empty() {
444                return Ok(active);
445            }
446            let now = std::time::Instant::now();
447            if now >= deadline {
448                return Err(DdsError::Timeout);
449            }
450            // Sleep bis zum naechsten Poll-Tick oder Deadline.
451            let sleep_for = (deadline - now).min(self.inner.poll_interval);
452            let cvlock = self
453                .inner
454                .locked
455                .lock()
456                .map_err(|_| DdsError::PreconditionNotMet {
457                    reason: "waitset locked poisoned",
458                })?;
459            let _ = self.inner.cvar.wait_timeout(cvlock, sleep_for);
460            // (das Result verwerfen ist ok — wir pollen sowieso erneut
461            //  am Schleifenanfang.)
462        }
463    }
464
465    /// `wait_until_any_or_all_triggered` — internal helper.
466    fn poll_active(&self) -> Result<Vec<Arc<dyn Condition>>> {
467        let conds = self
468            .inner
469            .conditions
470            .lock()
471            .map_err(|_| DdsError::PreconditionNotMet {
472                reason: "waitset conditions poisoned",
473            })?;
474        Ok(conds
475            .iter()
476            .filter(|c| c.get_trigger_value())
477            .cloned()
478            .collect())
479    }
480
481    /// Weckt einen blockierten `wait()` ohne dass ein
482    /// trigger_value-Wechsel passiert ist (z.B. fuer Shutdown).
483    ///
484    /// Aktuell hat das nur Effekt, wenn der WaitSet
485    /// gerade in `wait_timeout` haengt — Caller sollte trotzdem eine
486    /// `GuardCondition` anhaengen, damit der wakeup ein definites
487    /// Ergebnis hat.
488    pub fn notify(&self) {
489        self.inner.cvar.notify_all();
490    }
491}
492
493#[cfg(feature = "std")]
494impl Default for WaitSet {
495    fn default() -> Self {
496        Self::new()
497    }
498}
499
500#[cfg(feature = "std")]
501impl Clone for WaitSet {
502    fn clone(&self) -> Self {
503        Self {
504            inner: Arc::clone(&self.inner),
505        }
506    }
507}
508
509#[cfg(feature = "std")]
510impl core::fmt::Debug for WaitSet {
511    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
512        let n = self.inner.conditions.lock().map(|c| c.len()).unwrap_or(0);
513        f.debug_struct("WaitSet").field("conditions", &n).finish()
514    }
515}
516
517#[cfg(test)]
518#[cfg(feature = "std")]
519#[allow(clippy::expect_used, clippy::unwrap_used)]
520mod tests {
521    use super::*;
522    use std::thread;
523    use std::time::Instant;
524
525    #[test]
526    fn guard_condition_starts_false() {
527        let g = GuardCondition::new();
528        assert!(!g.get_trigger_value());
529    }
530
531    #[test]
532    fn guard_condition_set_trigger() {
533        let g = GuardCondition::new();
534        g.set_trigger_value(true);
535        assert!(g.get_trigger_value());
536        g.set_trigger_value(false);
537        assert!(!g.get_trigger_value());
538    }
539
540    #[test]
541    fn waitset_attach_detach_idempotent() {
542        let ws = WaitSet::new();
543        let g = GuardCondition::new();
544        let cond: Arc<dyn Condition> = g.clone();
545        ws.attach_condition(cond.clone()).unwrap();
546        ws.attach_condition(cond.clone()).unwrap(); // idempotent
547        assert_eq!(ws.get_conditions().unwrap().len(), 1);
548        ws.detach_condition(&cond).unwrap();
549        assert!(ws.get_conditions().unwrap().is_empty());
550    }
551
552    #[test]
553    fn waitset_wait_returns_immediately_if_already_triggered() {
554        let ws = WaitSet::new();
555        let g = GuardCondition::new();
556        g.set_trigger_value(true);
557        let cond: Arc<dyn Condition> = g.clone();
558        ws.attach_condition(cond).unwrap();
559        let triggered = ws.wait(Duration::from_secs(1)).unwrap();
560        assert_eq!(triggered.len(), 1);
561    }
562
563    #[test]
564    fn waitset_wait_timeout_returns_err() {
565        let ws = WaitSet::new();
566        let g = GuardCondition::new();
567        let cond: Arc<dyn Condition> = g.clone();
568        ws.attach_condition(cond).unwrap();
569        let start = Instant::now();
570        let res = ws.wait(Duration::from_millis(50));
571        assert!(matches!(res, Err(DdsError::Timeout)));
572        // Wir haben mindestens ~50ms gewartet
573        assert!(start.elapsed() >= Duration::from_millis(40));
574    }
575
576    #[test]
577    fn waitset_wakes_when_guard_triggers() {
578        let ws = WaitSet::new();
579        let g = GuardCondition::new();
580        let cond: Arc<dyn Condition> = g.clone();
581        ws.attach_condition(cond).unwrap();
582
583        let g_clone = g.clone();
584        let handle = thread::spawn(move || {
585            thread::sleep(Duration::from_millis(20));
586            g_clone.set_trigger_value(true);
587        });
588
589        let start = Instant::now();
590        let triggered = ws.wait(Duration::from_secs(2)).unwrap();
591        let elapsed = start.elapsed();
592        handle.join().unwrap();
593
594        assert_eq!(triggered.len(), 1);
595        // Sollte schnell zurueckkehren, nicht das volle Timeout
596        assert!(elapsed < Duration::from_millis(500), "elapsed={elapsed:?}");
597    }
598
599    #[test]
600    fn waitset_with_multiple_conditions_returns_all_triggered() {
601        let ws = WaitSet::new();
602        let g1 = GuardCondition::new();
603        let g2 = GuardCondition::new();
604        ws.attach_condition(g1.clone()).unwrap();
605        ws.attach_condition(g2.clone()).unwrap();
606        g1.set_trigger_value(true);
607        g2.set_trigger_value(true);
608        let triggered = ws.wait(Duration::from_millis(100)).unwrap();
609        assert_eq!(triggered.len(), 2);
610    }
611
612    #[test]
613    fn status_condition_implements_condition_trait() {
614        let state = crate::entity::EntityState::new();
615        let sc = crate::entity::StatusCondition::new(state.clone());
616        sc.set_enabled_statuses(0b0010);
617        assert!(!sc.get_trigger_value());
618        state.set_status_bits(0b0010);
619        assert!(sc.get_trigger_value());
620    }
621
622    #[test]
623    fn waitset_clone_shares_state() {
624        let ws1 = WaitSet::new();
625        let ws2 = ws1.clone();
626        let g = GuardCondition::new();
627        ws1.attach_condition(g.clone()).unwrap();
628        // Beide haben dieselbe Condition (gleicher Inner via Arc).
629        assert_eq!(ws2.get_conditions().unwrap().len(), 1);
630    }
631
632    #[test]
633    fn waitset_default_is_empty() {
634        let ws = WaitSet::default();
635        assert!(ws.get_conditions().unwrap().is_empty());
636    }
637
638    // ---- §2.2.2.1.6 Single-Thread-Wait + Resource-Limit ----
639
640    #[test]
641    fn waitset_concurrent_wait_returns_precondition_not_met() {
642        // Spec §2.2.2.1.6 — WaitSet darf nur von einem Thread gleichzeitig
643        // benutzt werden. Zweiter wait()-Aufruf MUSS PreconditionNotMet
644        // liefern.
645        let ws = WaitSet::new();
646        // Keine Condition attached → wait laeuft ins Timeout.
647        let ws_clone = ws.clone();
648        let handle = thread::spawn(move || ws_clone.wait(Duration::from_millis(100)));
649        // Kurze Pause, damit Thread A den waiting-Flag setzt.
650        thread::sleep(Duration::from_millis(20));
651        let res = ws.wait(Duration::from_millis(10));
652        assert!(matches!(res, Err(DdsError::PreconditionNotMet { .. })));
653        let _ = handle.join().unwrap();
654    }
655
656    #[test]
657    fn waitset_sequential_waits_after_first_returns_succeed() {
658        // Nach Abschluss des ersten wait() (per Timeout) muss ein
659        // sequentieller zweiter wait() wieder funktionieren.
660        let ws = WaitSet::new();
661        let r1 = ws.wait(Duration::from_millis(10));
662        assert!(matches!(r1, Err(DdsError::Timeout)));
663        // Sequentiell: zweiter Aufruf nach erstem Return geht durch.
664        let r2 = ws.wait(Duration::from_millis(10));
665        assert!(matches!(r2, Err(DdsError::Timeout)));
666    }
667
668    #[test]
669    fn waitset_attach_above_max_returns_out_of_resources() {
670        // §2.2.1.1.6 RC OUT_OF_RESOURCES — DoS-Cap.
671        let ws = WaitSet::new();
672        for _ in 0..MAX_WAITSET_CONDITIONS {
673            let g = GuardCondition::new();
674            ws.attach_condition(g).unwrap();
675        }
676        // Nächste Attach muss OOR liefern.
677        let g = GuardCondition::new();
678        let res = ws.attach_condition(g);
679        assert!(matches!(res, Err(DdsError::OutOfResources { .. })));
680        assert_eq!(ws.get_conditions().unwrap().len(), MAX_WAITSET_CONDITIONS);
681    }
682
683    #[test]
684    fn waitset_attach_idempotent_does_not_count_against_cap() {
685        // Selbe Arc-Identitaet zweimal attached → kein Duplikat,
686        // kein Counter-Increment.
687        let ws = WaitSet::new();
688        let g = GuardCondition::new();
689        let cond: Arc<dyn Condition> = g.clone();
690        ws.attach_condition(cond.clone()).unwrap();
691        ws.attach_condition(cond.clone()).unwrap();
692        ws.attach_condition(cond).unwrap();
693        assert_eq!(ws.get_conditions().unwrap().len(), 1);
694    }
695
696    // ---- §2.2.2.5.8 ReadCondition + §2.2.4.5 Trigger State ----
697
698    #[test]
699    fn read_condition_returns_trigger_from_closure() {
700        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
701        // Trigger-Closure liefert true wenn alle drei Masks ANY sind.
702        let cond = ReadCondition::new(
703            sample_state_mask::ANY,
704            view_state_mask::ANY,
705            instance_state_mask::ANY,
706            |sm, vm, im| {
707                sm == sample_state_mask::ANY
708                    && vm == view_state_mask::ANY
709                    && im == instance_state_mask::ANY
710            },
711        );
712        assert_eq!(cond.get_sample_state_mask(), sample_state_mask::ANY);
713        assert_eq!(cond.get_view_state_mask(), view_state_mask::ANY);
714        assert_eq!(cond.get_instance_state_mask(), instance_state_mask::ANY);
715        assert!(cond.get_trigger_value());
716    }
717
718    #[test]
719    fn read_condition_trigger_false_when_closure_says_false() {
720        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
721        // Closure liefert immer false → keine Daten verfuegbar.
722        let cond = ReadCondition::new(
723            sample_state_mask::NOT_READ,
724            view_state_mask::NEW,
725            instance_state_mask::ALIVE,
726            |_, _, _| false,
727        );
728        assert!(!cond.get_trigger_value());
729    }
730
731    #[test]
732    fn read_condition_implements_condition_trait_object_safe() {
733        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
734        let cond = ReadCondition::new(
735            sample_state_mask::ANY,
736            view_state_mask::ANY,
737            instance_state_mask::ANY,
738            |_, _, _| true,
739        );
740        let dyn_cond: Arc<dyn Condition> = cond;
741        assert!(dyn_cond.get_trigger_value());
742    }
743
744    #[test]
745    fn read_condition_attaches_to_waitset() {
746        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
747        // §2.2.4.5: ReadCondition als WaitSet-Trigger.
748        let ws = WaitSet::new();
749        let cond = ReadCondition::new(
750            sample_state_mask::ANY,
751            view_state_mask::ANY,
752            instance_state_mask::ANY,
753            |_, _, _| true, // immer triggered
754        );
755        ws.attach_condition(cond.clone()).unwrap();
756        let triggered = ws.wait(Duration::from_millis(50)).unwrap();
757        assert_eq!(triggered.len(), 1);
758    }
759
760    // ---- §2.2.2.5.9 QueryCondition ----
761
762    #[test]
763    fn query_condition_inherits_trigger_from_base() {
764        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
765        let base = ReadCondition::new(
766            sample_state_mask::ANY,
767            view_state_mask::ANY,
768            instance_state_mask::ANY,
769            |_, _, _| true,
770        );
771        let qc = QueryCondition::new(base, "x > 10", alloc::vec::Vec::new()).unwrap();
772        assert!(qc.get_trigger_value());
773        assert_eq!(qc.get_query_expression(), "x > 10");
774        assert!(qc.get_query_parameters().is_empty());
775    }
776
777    #[test]
778    fn query_condition_set_query_parameters_roundtrip() {
779        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
780        let base = ReadCondition::new(
781            sample_state_mask::ANY,
782            view_state_mask::ANY,
783            instance_state_mask::ANY,
784            |_, _, _| false,
785        );
786        let qc = QueryCondition::new(base, "color = %0", alloc::vec!["RED".into()]).unwrap();
787        assert_eq!(qc.get_query_parameters(), alloc::vec!["RED".to_string()]);
788        qc.set_query_parameters(alloc::vec!["BLUE".into(), "GREEN".into()])
789            .unwrap();
790        assert_eq!(
791            qc.get_query_parameters(),
792            alloc::vec!["BLUE".to_string(), "GREEN".to_string()]
793        );
794    }
795
796    #[test]
797    fn query_condition_trigger_inherits_false_from_base() {
798        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
799        let base = ReadCondition::new(
800            sample_state_mask::READ,
801            view_state_mask::NOT_NEW,
802            instance_state_mask::NOT_ALIVE,
803            |_, _, _| false, // keine matchenden Samples
804        );
805        let qc = QueryCondition::new(base, "x > 0", alloc::vec::Vec::new()).unwrap();
806        assert!(!qc.get_trigger_value());
807    }
808
809    #[test]
810    fn query_condition_base_returns_correct_read_condition() {
811        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
812        let base = ReadCondition::new(
813            sample_state_mask::NOT_READ,
814            view_state_mask::NEW,
815            instance_state_mask::ALIVE,
816            |_, _, _| true,
817        );
818        let qc = QueryCondition::new(base.clone(), "x > 0", alloc::vec::Vec::new()).unwrap();
819        // base() liefert dieselbe Arc-Identitaet.
820        assert!(Arc::ptr_eq(&base, qc.base()));
821    }
822
823    #[test]
824    fn query_condition_rejects_invalid_sql() {
825        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
826        let base = ReadCondition::new(
827            sample_state_mask::ANY,
828            view_state_mask::ANY,
829            instance_state_mask::ANY,
830            |_, _, _| true,
831        );
832        let r = QueryCondition::new(base, "x > >", alloc::vec::Vec::new());
833        assert!(matches!(r, Err(DdsError::BadParameter { .. })));
834    }
835
836    #[test]
837    fn query_condition_evaluate_filters_per_sample() {
838        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
839        use zerodds_sql_filter::{RowAccess, Value};
840        struct R(i64);
841        impl RowAccess for R {
842            fn get(&self, p: &str) -> Option<Value> {
843                if p == "x" {
844                    Some(Value::Int(self.0))
845                } else {
846                    None
847                }
848            }
849        }
850        let base = ReadCondition::new(
851            sample_state_mask::ANY,
852            view_state_mask::ANY,
853            instance_state_mask::ANY,
854            |_, _, _| true,
855        );
856        let qc = QueryCondition::new(base, "x > 10", alloc::vec::Vec::new()).unwrap();
857        assert!(qc.evaluate(&R(42)).unwrap());
858        assert!(!qc.evaluate(&R(5)).unwrap());
859    }
860
861    #[test]
862    fn query_condition_evaluate_with_typed_values_int_param() {
863        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
864        use zerodds_sql_filter::{RowAccess, Value};
865        struct R(i64);
866        impl RowAccess for R {
867            fn get(&self, p: &str) -> Option<Value> {
868                if p == "x" {
869                    Some(Value::Int(self.0))
870                } else {
871                    None
872                }
873            }
874        }
875        let base = ReadCondition::new(
876            sample_state_mask::ANY,
877            view_state_mask::ANY,
878            instance_state_mask::ANY,
879            |_, _, _| true,
880        );
881        let qc = QueryCondition::new(base, "x > %0", alloc::vec::Vec::new()).unwrap();
882        assert!(qc.evaluate_with_values(&R(42), &[Value::Int(10)]).unwrap());
883        assert!(!qc.evaluate_with_values(&R(5), &[Value::Int(10)]).unwrap());
884    }
885
886    #[test]
887    fn query_condition_evaluate_uses_string_params_by_default() {
888        use crate::sample_info::{instance_state_mask, sample_state_mask, view_state_mask};
889        use zerodds_sql_filter::{RowAccess, Value};
890        struct R(alloc::string::String);
891        impl RowAccess for R {
892            fn get(&self, p: &str) -> Option<Value> {
893                if p == "color" {
894                    Some(Value::String(self.0.clone()))
895                } else {
896                    None
897                }
898            }
899        }
900        let base = ReadCondition::new(
901            sample_state_mask::ANY,
902            view_state_mask::ANY,
903            instance_state_mask::ANY,
904            |_, _, _| true,
905        );
906        let qc = QueryCondition::new(base, "color = %0", alloc::vec!["RED".into()]).unwrap();
907        assert!(qc.evaluate(&R("RED".into())).unwrap());
908        assert!(!qc.evaluate(&R("BLUE".into())).unwrap());
909    }
910
911    #[test]
912    fn waitset_panic_in_wait_releases_waiting_flag() {
913        // Wenn ein Caller mit einem panicking Condition::get_trigger_value
914        // den wait() abbricht, muss der waiting-Flag trotzdem freigegeben
915        // werden (RAII-Guard). Modelliert durch Drop-Test des
916        // Wait-Guards: wir simulieren mit einem normalen Timeout-Return
917        // und pruefen, dass danach ein neuer wait moeglich ist (kein
918        // hartes Lock).
919        let ws = WaitSet::new();
920        let _ = ws.wait(Duration::from_millis(5));
921        // Sequentieller wait ist erlaubt → waiting-Flag wurde wieder
922        // 0 nach Drop des Guards.
923        let r = ws.wait(Duration::from_millis(5));
924        assert!(matches!(r, Err(DdsError::Timeout)));
925    }
926}