Skip to main content

batch_aint_one/policies/
duration.rs

1//! Duration policy: Process batches after a given duration or when full.
2//!
3//! Prioritises regularity.
4
5use std::time::Duration;
6
7use crate::{Processor, batch_queue::BatchQueue};
8
9use super::{OnAdd, OnFinish, OnFull};
10
11pub(super) fn on_add<P: Processor>(
12    duration: Duration,
13    on_full: OnFull,
14    batch_queue: &BatchQueue<P>,
15) -> OnAdd {
16    if batch_queue.adding_to_new_batch() {
17        return OnAdd::AddAndProcessAfter(duration);
18    }
19
20    if !batch_queue.last_space_in_batch() {
21        return OnAdd::Add;
22    }
23
24    // Last space in batch
25    match on_full {
26        OnFull::Process if !batch_queue.at_max_total_processing_capacity() => OnAdd::AddAndProcess,
27        OnFull::Process | OnFull::Reject => OnAdd::Add,
28    }
29}
30
31pub(super) fn on_finish<P: Processor>(batch_queue: &BatchQueue<P>) -> OnFinish {
32    if batch_queue.has_next_batch_timeout_expired() || batch_queue.is_next_batch_full() {
33        OnFinish::ProcessNext
34    } else {
35        OnFinish::DoNothing
36    }
37}
38
39#[cfg(test)]
40mod tests {
41    use std::sync::Arc;
42
43    use assert_matches::assert_matches;
44    use tokio::sync::{Notify, mpsc};
45
46    use crate::{
47        Limits,
48        batch_inner::Generation,
49        batch_queue::BatchQueue,
50        error::{ConcurrencyStatus, RejectionReason},
51        policies::BatchingPolicy,
52        worker::Message,
53    };
54
55    use super::super::test_utils::*;
56    use super::*;
57
58    #[test]
59    fn schedules_timeout_when_adding_to_empty() {
60        let limits = Limits::builder().max_batch_size(2).build();
61        let queue = BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
62
63        let duration = Duration::from_millis(100);
64        let policy = BatchingPolicy::Duration(duration, OnFull::Process);
65        let result = policy.on_add(&queue);
66
67        assert_matches!(result, OnAdd::AddAndProcessAfter(d) if d == duration);
68    }
69
70    #[test]
71    fn onfull_reject_rejects_when_full_but_not_processing() {
72        let limits = Limits::builder()
73            .max_batch_size(1)
74            .max_key_concurrency(1)
75            .max_batch_queue_size(1)
76            .build();
77        let mut queue =
78            BatchQueue::<TestProcessor>::new("test".to_string(), "key".to_string(), limits);
79
80        // Fill the batch but don't start processing
81        queue.push(new_item("key".to_string(), "item1".to_string()));
82
83        // Full but not at processing capacity yet - should still reject as BatchFull
84        let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Reject);
85        let result = policy.on_add(&queue);
86
87        assert_matches!(
88            result,
89            OnAdd::Reject(RejectionReason::BatchQueueFull(
90                ConcurrencyStatus::Available
91            ))
92        );
93    }
94
95    #[tokio::test]
96    async fn timeout_while_processing() {
97        // Scenario: Duration policy, max_concurrency=1, batch_size=2
98        // 1. Add 3 items (2 in first batch, 1 in second)
99        // 2. First batch starts processing
100        // 3. Second batch times out while first is still processing
101        // 4. After first finishes, second batch should be processed
102
103        let processor = ControlledProcessor::default();
104        let limits = Limits::builder()
105            .max_batch_size(2)
106            .max_key_concurrency(1)
107            .build();
108        let mut queue = BatchQueue::<ControlledProcessor>::new("test".to_string(), (), limits);
109        let policy = BatchingPolicy::Duration(Duration::from_millis(100), OnFull::Process);
110
111        // Step 1: Add first 2 items (should fill first batch and start processing)
112        let result = policy.on_add(&queue);
113        assert_matches!(result, OnAdd::AddAndProcessAfter(_));
114        let notify1 = Arc::new(Notify::new());
115        queue.push(new_item((), Arc::clone(&notify1).notified_owned()));
116
117        let result = policy.on_add(&queue);
118        assert_matches!(result, OnAdd::AddAndProcess); // Last space, should process
119        queue.push(new_item((), Arc::clone(&notify1).notified_owned()));
120
121        // Start processing first batch
122        let (on_finished, mut rx) = mpsc::channel(1);
123        queue.process_next_ready_batch(processor, on_finished);
124
125        // Step 2: Add third item (goes to second batch)
126        let result = policy.on_add(&queue);
127        assert_matches!(result, OnAdd::AddAndProcessAfter(_)); // New batch, set timeout
128        let notify2 = Arc::new(Notify::new());
129        queue.push(new_item((), notify2.notified_owned()));
130        let (tx, mut timeout_rx) = mpsc::channel(1);
131        queue.process_after(Duration::from_millis(1), tx);
132
133        // Step 3: Second batch times out while first is still processing
134        let msg = timeout_rx.recv().await.unwrap(); // Wait for timeout signal
135        let second_gen = Generation::default().next();
136        assert_matches!(msg, Message::TimedOut(_, generation)=> {
137            assert_eq!(generation, second_gen);
138        });
139        let result = policy.on_timeout(second_gen, &queue);
140        assert_matches!(result, super::super::OnGenerationEvent::DoNothing); // Can't process, at max capacity
141
142        // Step 4: First batch finishes
143        notify1.notify_waiters(); // Let first batch complete
144        let msg = rx.recv().await.unwrap();
145        assert_matches!(msg, Message::Finished(_, _));
146
147        queue.mark_processed();
148
149        let result = policy.on_finish(&queue);
150        assert_matches!(result, OnFinish::ProcessNext); // Should process second batch
151    }
152}