objectiveai_api/util.rs
1//! Utility types for streaming and choice indexing.
2
3use dashmap::DashMap;
4use futures::Stream;
5use std::sync::atomic::AtomicU64;
6
7/// Assigns sequential indices to concurrent streams in first-come-first-served order.
8///
9/// When multiple concurrent streams need unique indices, this struct ensures
10/// each stream gets the next available index. The first stream to request an
11/// index for a given native key gets index 0 (or `initial`), the next gets 1, etc.
12///
13/// Thread-safe: uses atomic operations and concurrent hash map.
14pub struct ChoiceIndexer {
15 /// Counter for the next index to assign.
16 counter: AtomicU64,
17 /// Map from native keys to their assigned indices.
18 indices: DashMap<usize, u64>,
19}
20
21impl ChoiceIndexer {
22 /// Creates a new choice indexer starting from the given initial value.
23 pub fn new(initial: u64) -> Self {
24 Self {
25 counter: AtomicU64::new(initial),
26 indices: DashMap::new(),
27 }
28 }
29
30 /// Gets the index for a native key, assigning the next available index if new.
31 ///
32 /// First-come-first-served: the first caller for a given native key gets
33 /// the current counter value, then the counter increments for the next caller.
34 pub fn get(&self, native_index: usize) -> u64 {
35 *self.indices.entry(native_index).or_insert_with(|| {
36 self.counter
37 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
38 })
39 }
40}
41
42/// A stream that yields exactly one item, then completes.
43///
44/// Useful for wrapping a single value in a stream interface,
45/// particularly for error handling in streaming contexts.
46pub struct StreamOnce<T>(Option<T>);
47
48impl<T> StreamOnce<T> {
49 /// Creates a new single-item stream containing the given item.
50 pub fn new(item: T) -> Self {
51 Self(Some(item))
52 }
53}
54
55impl<T> Stream for StreamOnce<T>
56where
57 T: Unpin,
58{
59 type Item = T;
60
61 fn poll_next(
62 mut self: std::pin::Pin<&mut Self>,
63 _cx: &mut std::task::Context<'_>,
64 ) -> std::task::Poll<Option<Self::Item>> {
65 std::task::Poll::Ready(self.as_mut().get_mut().0.take())
66 }
67}