1use crate::controller::{Controller, ControllerSnapshot, Decision, Sample};
27use crate::measurement::{
28 MetadataOp, N_META_OPS, N_META_RESOURCES, ResourceKind, SampleSink, Side,
29};
30
31pub const DEFAULT_TICK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
33
34pub const DEFAULT_CHANNEL_CAPACITY: usize = 4096;
38
39pub struct ControlUnit<C: Controller> {
72 label: &'static str,
73 controller: C,
74 sample_rx: tokio::sync::mpsc::Receiver<Sample>,
75 decision_tx: tokio::sync::watch::Sender<Decision>,
76 snapshot_tx: tokio::sync::watch::Sender<ControllerSnapshot>,
77 tick_interval: std::time::Duration,
78}
79
80impl<C: Controller + 'static> ControlUnit<C> {
81 pub fn new(
93 label: &'static str,
94 controller: C,
95 sample_rx: tokio::sync::mpsc::Receiver<Sample>,
96 tick_interval: std::time::Duration,
97 ) -> (
98 Self,
99 tokio::sync::watch::Receiver<Decision>,
100 tokio::sync::watch::Receiver<ControllerSnapshot>,
101 ) {
102 let (decision_tx, decision_rx) = tokio::sync::watch::channel(Decision::UNLIMITED);
103 let (snapshot_tx, snapshot_rx) = tokio::sync::watch::channel(ControllerSnapshot::default());
104 (
105 Self {
106 label,
107 controller,
108 sample_rx,
109 decision_tx,
110 snapshot_tx,
111 tick_interval,
112 },
113 decision_rx,
114 snapshot_rx,
115 )
116 }
117
118 pub fn label(&self) -> &'static str {
121 self.label
122 }
123
124 pub fn spawn(self) -> tokio::task::JoinHandle<()> {
127 tokio::spawn(self.run())
128 }
129
130 pub async fn run(mut self) {
132 let mut interval = tokio::time::interval(self.tick_interval);
133 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
136 interval.tick().await;
137 let initial = self.controller.on_tick(std::time::Instant::now());
138 Self::log_tick(self.label, self.controller.name(), &initial, None);
139 let _ = self.decision_tx.send(initial);
140 let _ = self.snapshot_tx.send(self.controller.snapshot());
141 let mut last = initial;
142 loop {
143 tokio::select! {
144 _ = interval.tick() => {
145 let decision = self.controller.on_tick(std::time::Instant::now());
146 Self::log_tick(self.label, self.controller.name(), &decision, Some(&last));
147 let _ = self.decision_tx.send(decision);
148 let _ = self.snapshot_tx.send(self.controller.snapshot());
149 last = decision;
150 }
151 sample = self.sample_rx.recv() => {
152 match sample {
153 Some(s) => {
154 self.controller.on_sample(&s);
155 while let Ok(s) = self.sample_rx.try_recv() {
158 self.controller.on_sample(&s);
159 }
160 }
161 None => break,
162 }
163 }
164 }
165 }
166 tracing::debug!(
167 unit = %self.label,
168 controller = %self.controller.name(),
169 "control loop exiting: sample channel closed",
170 );
171 }
172
173 fn log_tick(
174 label: &'static str,
175 controller: &'static str,
176 decision: &Decision,
177 previous: Option<&Decision>,
178 ) {
179 tracing::trace!(
182 unit = %label,
183 controller = %controller,
184 max_in_flight = ?decision.max_in_flight,
185 rate_per_sec = ?decision.rate_per_sec,
186 "control tick",
187 );
188 if previous.is_none_or(|p| p != decision) {
189 tracing::debug!(
190 unit = %label,
191 controller = %controller,
192 max_in_flight = ?decision.max_in_flight,
193 rate_per_sec = ?decision.rate_per_sec,
194 prev_max_in_flight = ?previous.and_then(|p| p.max_in_flight),
195 prev_rate_per_sec = ?previous.and_then(|p| p.rate_per_sec),
196 "decision changed",
197 );
198 }
199 }
200}
201
202fn metadata_index(side: Side, op: MetadataOp) -> usize {
208 (side as usize) * N_META_OPS + (op as usize)
209}
210
211pub struct RoutingSink {
218 metadata: [Option<tokio::sync::mpsc::Sender<Sample>>; N_META_RESOURCES],
222 histograms: [Option<std::sync::Arc<std::sync::Mutex<crate::histogram::HistogramAccumulator>>>;
228 N_META_RESOURCES],
229 read: Option<tokio::sync::mpsc::Sender<Sample>>,
230 write: Option<tokio::sync::mpsc::Sender<Sample>>,
231 dropped: std::sync::Arc<std::sync::atomic::AtomicU64>,
232}
233
234impl RoutingSink {
235 pub fn dropped_samples(&self) -> u64 {
238 self.dropped.load(std::sync::atomic::Ordering::Relaxed)
239 }
240}
241
242impl SampleSink for RoutingSink {
243 fn record(&self, kind: ResourceKind, sample: &Sample) {
261 if let ResourceKind::Metadata(side, op) = kind
268 && let Some(acc) = &self.histograms[metadata_index(side, op)]
269 {
270 acc.lock()
271 .expect("histogram accumulator mutex poisoned")
272 .record(sample.latency());
273 }
274 let tx = match kind {
275 ResourceKind::Metadata(side, op) => self.metadata[metadata_index(side, op)].as_ref(),
276 ResourceKind::DataRead => self.read.as_ref(),
277 ResourceKind::DataWrite => self.write.as_ref(),
278 };
279 if let Some(tx) = tx {
280 match tx.try_send(*sample) {
281 Ok(()) => {}
282 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
283 self.dropped
284 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
285 }
286 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
287 }
289 }
290 }
291 }
292}
293
294pub struct RoutingSinkBuilder {
298 metadata: [Option<tokio::sync::mpsc::Sender<Sample>>; N_META_RESOURCES],
299 histograms: [Option<std::sync::Arc<std::sync::Mutex<crate::histogram::HistogramAccumulator>>>;
300 N_META_RESOURCES],
301 read: Option<tokio::sync::mpsc::Sender<Sample>>,
302 write: Option<tokio::sync::mpsc::Sender<Sample>>,
303 capacity: usize,
304 dropped: std::sync::Arc<std::sync::atomic::AtomicU64>,
305}
306
307impl Default for RoutingSinkBuilder {
308 fn default() -> Self {
309 Self {
310 metadata: [const { None }; N_META_RESOURCES],
311 histograms: [const { None }; N_META_RESOURCES],
312 read: None,
313 write: None,
314 capacity: DEFAULT_CHANNEL_CAPACITY,
315 dropped: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
316 }
317 }
318}
319
320impl RoutingSinkBuilder {
321 pub fn new() -> Self {
322 Self::default()
323 }
324 pub fn with_capacity(mut self, capacity: usize) -> Self {
326 self.capacity = capacity.max(1);
327 self
328 }
329 pub fn metadata_receiver(
334 &mut self,
335 side: Side,
336 op: MetadataOp,
337 ) -> tokio::sync::mpsc::Receiver<Sample> {
338 let (tx, rx) = tokio::sync::mpsc::channel(self.capacity);
339 self.metadata[metadata_index(side, op)] = Some(tx);
340 rx
341 }
342
343 pub fn metadata_histogram(
349 &mut self,
350 side: Side,
351 op: MetadataOp,
352 accumulator: std::sync::Arc<std::sync::Mutex<crate::histogram::HistogramAccumulator>>,
353 ) {
354 self.histograms[metadata_index(side, op)] = Some(accumulator);
355 }
356 pub fn read_receiver(&mut self) -> tokio::sync::mpsc::Receiver<Sample> {
358 let (tx, rx) = tokio::sync::mpsc::channel(self.capacity);
359 self.read = Some(tx);
360 rx
361 }
362 pub fn write_receiver(&mut self) -> tokio::sync::mpsc::Receiver<Sample> {
364 let (tx, rx) = tokio::sync::mpsc::channel(self.capacity);
365 self.write = Some(tx);
366 rx
367 }
368 pub fn build(self) -> RoutingSink {
369 RoutingSink {
370 metadata: self.metadata,
371 histograms: self.histograms,
372 read: self.read,
373 write: self.write,
374 dropped: self.dropped,
375 }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use crate::controller::Outcome;
383 use crate::measurement::{Probe, clear_sample_sink, install_sample_sink};
384 use crate::{FixedController, NoopController};
385
386 fn make_sample(latency_ms: u64) -> Sample {
387 let start = std::time::Instant::now();
388 Sample {
389 started_at: start,
390 completed_at: start + std::time::Duration::from_millis(latency_ms),
391 bytes: 0,
392 outcome: Outcome::Ok,
393 }
394 }
395
396 #[tokio::test]
397 async fn control_unit_publishes_initial_decision_on_spawn() {
398 let (_tx, rx) = tokio::sync::mpsc::channel::<Sample>(32);
399 let controller = FixedController::with_concurrency(42);
400 let (unit, mut decision_rx, _snapshot_rx) =
401 ControlUnit::new("test", controller, rx, std::time::Duration::from_millis(10));
402 unit.spawn();
403 decision_rx
405 .changed()
406 .await
407 .expect("initial decision delivered");
408 let decision = *decision_rx.borrow();
409 assert_eq!(decision.max_in_flight, Some(42));
410 }
411
412 #[tokio::test]
413 async fn control_unit_feeds_samples_through_to_controller() {
414 struct CountingController {
416 count: std::sync::Arc<std::sync::atomic::AtomicU64>,
417 }
418 impl Controller for CountingController {
419 fn on_sample(&mut self, _s: &Sample) {
420 self.count
421 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
422 }
423 fn on_tick(&mut self, _now: std::time::Instant) -> Decision {
424 Decision::UNLIMITED
425 }
426 fn name(&self) -> &'static str {
427 "counting"
428 }
429 }
430 let count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
431 let (tx, rx) = tokio::sync::mpsc::channel::<Sample>(32);
432 let controller = CountingController {
433 count: count.clone(),
434 };
435 let (unit, _decision_rx, _snapshot_rx) = ControlUnit::new(
436 "test",
437 controller,
438 rx,
439 std::time::Duration::from_millis(100),
440 );
441 let handle = unit.spawn();
442 for _ in 0..5 {
443 tx.send(make_sample(1)).await.expect("send sample");
444 }
445 drop(tx);
447 handle.await.expect("control loop exits cleanly");
448 assert_eq!(count.load(std::sync::atomic::Ordering::Relaxed), 5);
449 }
450
451 #[tokio::test]
452 async fn routing_sink_dispatches_by_resource_kind() {
453 let mut builder = RoutingSinkBuilder::new();
454 let mut meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
455 let mut read_rx = builder.read_receiver();
456 let sink = builder.build();
457 sink.record(
459 ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
460 &make_sample(2),
461 );
462 let s = meta_rx.recv().await.expect("metadata sample delivered");
463 assert_eq!(s.bytes, 0);
464 sink.record(ResourceKind::DataRead, &make_sample(2));
466 let s = read_rx.recv().await.expect("read sample delivered");
467 assert_eq!(s.bytes, 0);
468 sink.record(ResourceKind::DataWrite, &make_sample(2));
470 }
472
473 #[tokio::test]
474 async fn routing_sink_separates_op_kinds() {
475 let mut builder = RoutingSinkBuilder::new();
479 let mut stat_rx = builder.metadata_receiver(Side::Destination, MetadataOp::Stat);
480 let mut unlink_rx = builder.metadata_receiver(Side::Destination, MetadataOp::Unlink);
481 let sink = builder.build();
482 sink.record(
483 ResourceKind::Metadata(Side::Destination, MetadataOp::Stat),
484 &make_sample(1),
485 );
486 sink.record(
487 ResourceKind::Metadata(Side::Destination, MetadataOp::Unlink),
488 &make_sample(2),
489 );
490 sink.record(
491 ResourceKind::Metadata(Side::Destination, MetadataOp::Unlink),
492 &make_sample(3),
493 );
494 assert!(stat_rx.recv().await.is_some());
496 assert!(stat_rx.try_recv().is_err());
497 assert!(unlink_rx.recv().await.is_some());
498 assert!(unlink_rx.recv().await.is_some());
499 assert!(unlink_rx.try_recv().is_err());
500 }
501
502 #[tokio::test]
503 async fn routing_sink_counts_dropped_samples_when_channel_is_full() {
504 let mut builder = RoutingSinkBuilder::new().with_capacity(2);
506 let _meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
507 let sink = builder.build();
508 for _ in 0..5 {
509 sink.record(
510 ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
511 &make_sample(1),
512 );
513 }
514 assert_eq!(sink.dropped_samples(), 3);
516 }
517
518 #[tokio::test]
519 async fn routing_sink_integrates_with_global_probe_api() {
520 let _guard = crate::measurement::SINK_GUARD.lock().await;
524 let mut builder = RoutingSinkBuilder::new();
525 let mut meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
526 let sink = builder.build();
527 install_sample_sink(std::sync::Arc::new(sink));
528 Probe::start_metadata(Side::Source, MetadataOp::Stat).complete_ok(0);
529 let s = meta_rx.recv().await.expect("sample flowed through");
530 assert_eq!(s.bytes, 0);
531 clear_sample_sink();
532 }
533
534 #[tokio::test]
535 async fn control_unit_exits_when_all_senders_dropped() {
536 let (tx, rx) = tokio::sync::mpsc::channel::<Sample>(32);
537 let (unit, _decision_rx, _snapshot_rx) = ControlUnit::new(
538 "test",
539 NoopController::new(),
540 rx,
541 std::time::Duration::from_millis(10),
542 );
543 let handle = unit.spawn();
544 drop(tx);
545 tokio::time::timeout(std::time::Duration::from_secs(1), handle)
546 .await
547 .expect("control loop exits within timeout")
548 .expect("control loop joins without panic");
549 }
550
551 #[tokio::test]
552 async fn routing_sink_records_to_histogram_synchronously() {
553 use crate::histogram::HistogramAccumulator;
557 let mut builder = RoutingSinkBuilder::new();
558 let _meta_rx = builder.metadata_receiver(Side::Source, MetadataOp::Stat);
559 let acc = std::sync::Arc::new(std::sync::Mutex::new(HistogramAccumulator::new()));
560 builder.metadata_histogram(Side::Source, MetadataOp::Stat, acc.clone());
561 let sink = builder.build();
562 sink.record(
563 ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
564 &make_sample(5),
565 );
566 sink.record(
567 ResourceKind::Metadata(Side::Source, MetadataOp::Stat),
568 &make_sample(7),
569 );
570 let snap = acc.lock().unwrap().snapshot_and_reset();
573 assert_eq!(snap.len(), 2);
574 }
575}