Skip to main content

zerodds_corba_rt/
policy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Priority-Model + Threadpools/Lanes + Priority-Banded-Connections
5//! (RT-CORBA §5.4.1, §5.7, §5.8).
6
7use alloc::vec::Vec;
8
9use crate::priority::Priority;
10
11/// Priority-Model (RT-CORBA §5.4.1).
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum PriorityModel {
14    /// `SERVER_DECLARED` — the server determines the execution priority.
15    ServerDeclared,
16    /// `CLIENT_PROPAGATED` — the client priority travels along and applies at the server.
17    ClientPropagated,
18}
19
20/// `PriorityModelPolicy` (RT-CORBA §5.4.1): model + (with `SERVER_DECLARED`) the
21/// server-declared default priority. Advertised in the IOR as the TaggedComponent
22/// `TAG_RT_CORBA_PRIORITY_MODEL`.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct PriorityModelPolicy {
25    /// The priority model.
26    pub model: PriorityModel,
27    /// Server-declared priority (relevant with `SERVER_DECLARED`).
28    pub server_priority: Priority,
29}
30
31impl PriorityModelPolicy {
32    /// The priority a request actually gets at the server: the
33    /// propagated client priority with `CLIENT_PROPAGATED`, otherwise the
34    /// server-declared one.
35    #[must_use]
36    pub fn effective_priority(&self, propagated: Option<Priority>) -> Priority {
37        match self.model {
38            PriorityModel::ClientPropagated => propagated.unwrap_or(self.server_priority),
39            PriorityModel::ServerDeclared => self.server_priority,
40        }
41    }
42}
43
44/// A threadpool **lane** (RT-CORBA §5.7): serves requests of one priority with
45/// a fixed plus dynamically growing number of threads.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct Lane {
48    /// Priority that this lane serves.
49    pub priority: Priority,
50    /// Statically maintained threads.
51    pub static_threads: u32,
52    /// Maximum number of additional dynamically creatable threads.
53    pub dynamic_threads: u32,
54}
55
56/// A **threadpool** with lanes (RT-CORBA §5.7). Models structure + selection;
57/// the actual thread spawning is the responsibility of the runtime integration.
58#[derive(Debug, Clone)]
59pub struct Threadpool {
60    /// Lanes (one per served priority).
61    pub lanes: Vec<Lane>,
62    /// Stack size per thread in bytes (0 = platform default).
63    pub stacksize: usize,
64    /// Whether requests are buffered when a lane is full (instead of rejected).
65    pub allow_request_buffering: bool,
66    /// Max buffered requests (0 = unbounded when buffering is on).
67    pub max_buffered_requests: u32,
68}
69
70impl Threadpool {
71    /// Selects the lane for a request priority: the lane with the **highest
72    /// priority ≤ `priority`** (the most demanding one that still covers the
73    /// request). Returns `None` if no lane fits.
74    #[must_use]
75    pub fn lane_for(&self, priority: Priority) -> Option<&Lane> {
76        self.lanes
77            .iter()
78            .filter(|l| l.priority <= priority)
79            .max_by_key(|l| l.priority)
80            .or_else(|| self.lanes.iter().min_by_key(|l| l.priority))
81    }
82
83    /// Total static thread capacity across all lanes.
84    #[must_use]
85    pub fn static_capacity(&self) -> u32 {
86        self.lanes.iter().map(|l| l.static_threads).sum()
87    }
88}
89
90/// `ThreadpoolPolicy` (RT-CORBA §5.7) — binds a threadpool to a POA.
91#[derive(Debug, Clone)]
92pub struct ThreadpoolPolicy {
93    /// The associated threadpool.
94    pub pool: Threadpool,
95}
96
97/// A priority band (RT-CORBA §5.8): an interval `[low, high]` to which a
98/// dedicated connection is assigned.
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub struct PriorityBand {
101    /// Lower band bound (inclusive).
102    pub low: Priority,
103    /// Upper band bound (inclusive).
104    pub high: Priority,
105}
106
107impl PriorityBand {
108    /// Whether `priority` falls into this band.
109    #[must_use]
110    pub fn contains(&self, priority: Priority) -> bool {
111        self.low <= priority && priority <= self.high
112    }
113}
114
115/// `PriorityBandedConnectionPolicy` (RT-CORBA §5.8): multiple bands, each with
116/// its own connection — requests travel over the connection of their band
117/// (prevents priority inversion on a shared connection).
118#[derive(Debug, Clone)]
119pub struct PriorityBandedConnectionPolicy {
120    /// The configured bands.
121    pub bands: Vec<PriorityBand>,
122}
123
124impl PriorityBandedConnectionPolicy {
125    /// Index of the band that covers `priority` (= connection selection).
126    #[must_use]
127    pub fn band_for(&self, priority: Priority) -> Option<usize> {
128        self.bands.iter().position(|b| b.contains(priority))
129    }
130}
131
132#[cfg(test)]
133#[allow(clippy::unwrap_used, clippy::panic)]
134mod tests {
135    use super::*;
136
137    fn p(v: i16) -> Priority {
138        Priority::new(v).unwrap()
139    }
140
141    #[test]
142    fn client_propagated_uses_propagated_priority() {
143        let pol = PriorityModelPolicy {
144            model: PriorityModel::ClientPropagated,
145            server_priority: p(10),
146        };
147        assert_eq!(pol.effective_priority(Some(p(99))), p(99));
148        assert_eq!(pol.effective_priority(None), p(10)); // fallback
149    }
150
151    #[test]
152    fn server_declared_ignores_propagated() {
153        let pol = PriorityModelPolicy {
154            model: PriorityModel::ServerDeclared,
155            server_priority: p(10),
156        };
157        assert_eq!(pol.effective_priority(Some(p(99))), p(10));
158    }
159
160    #[test]
161    fn lane_selection_picks_highest_covering() {
162        let pool = Threadpool {
163            lanes: alloc::vec![
164                Lane {
165                    priority: p(0),
166                    static_threads: 2,
167                    dynamic_threads: 0
168                },
169                Lane {
170                    priority: p(50),
171                    static_threads: 4,
172                    dynamic_threads: 2
173                },
174                Lane {
175                    priority: p(90),
176                    static_threads: 1,
177                    dynamic_threads: 0
178                },
179            ],
180            stacksize: 0,
181            allow_request_buffering: true,
182            max_buffered_requests: 0,
183        };
184        assert_eq!(pool.lane_for(p(60)).unwrap().priority, p(50));
185        assert_eq!(pool.lane_for(p(90)).unwrap().priority, p(90));
186        assert_eq!(pool.lane_for(p(10)).unwrap().priority, p(0));
187        assert_eq!(pool.static_capacity(), 7);
188    }
189
190    #[test]
191    fn priority_band_selection() {
192        let pol = PriorityBandedConnectionPolicy {
193            bands: alloc::vec![
194                PriorityBand {
195                    low: p(0),
196                    high: p(32)
197                },
198                PriorityBand {
199                    low: p(33),
200                    high: p(66)
201                },
202                PriorityBand {
203                    low: p(67),
204                    high: p(99)
205                },
206            ],
207        };
208        assert_eq!(pol.band_for(p(10)), Some(0));
209        assert_eq!(pol.band_for(p(50)), Some(1));
210        assert_eq!(pol.band_for(p(80)), Some(2));
211    }
212}