Skip to main content

objectiveai_api/
util.rs

1//! Utility types for streaming and choice indexing.
2
3use dashmap::DashMap;
4use futures::Stream;
5use std::sync::atomic::AtomicU64;
6
7/// Assigns sequential indices to concurrent streams in first-come-first-served order.
8///
9/// When multiple concurrent streams need unique indices, this struct ensures
10/// each stream gets the next available index. The first stream to request an
11/// index for a given native key gets index 0 (or `initial`), the next gets 1, etc.
12///
13/// Thread-safe: uses atomic operations and concurrent hash map.
14pub struct ChoiceIndexer {
15    /// Counter for the next index to assign.
16    counter: AtomicU64,
17    /// Map from native keys to their assigned indices.
18    indices: DashMap<usize, u64>,
19}
20
21impl ChoiceIndexer {
22    /// Creates a new choice indexer starting from the given initial value.
23    pub fn new(initial: u64) -> Self {
24        Self {
25            counter: AtomicU64::new(initial),
26            indices: DashMap::new(),
27        }
28    }
29
30    /// Gets the index for a native key, assigning the next available index if new.
31    ///
32    /// First-come-first-served: the first caller for a given native key gets
33    /// the current counter value, then the counter increments for the next caller.
34    pub fn get(&self, native_index: usize) -> u64 {
35        *self.indices.entry(native_index).or_insert_with(|| {
36            self.counter
37                .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
38        })
39    }
40}
41
42/// A stream that yields exactly one item, then completes.
43///
44/// Useful for wrapping a single value in a stream interface,
45/// particularly for error handling in streaming contexts.
46pub struct StreamOnce<T>(Option<T>);
47
48impl<T> StreamOnce<T> {
49    /// Creates a new single-item stream containing the given item.
50    pub fn new(item: T) -> Self {
51        Self(Some(item))
52    }
53}
54
55impl<T> Stream for StreamOnce<T>
56where
57    T: Unpin,
58{
59    type Item = T;
60
61    fn poll_next(
62        mut self: std::pin::Pin<&mut Self>,
63        _cx: &mut std::task::Context<'_>,
64    ) -> std::task::Poll<Option<Self::Item>> {
65        std::task::Poll::Ready(self.as_mut().get_mut().0.take())
66    }
67}