ai-agent 0.13.4

Idiomatic agent sdk inspired by the claude code source leak
Documentation
// Source: /data/home/swei/claudecode/openclaudecode/src/utils/stream.ts
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

pub struct Stream<T> {
    queue: Arc<Mutex<VecDeque<T>>>,
    read_waker: Arc<Mutex<Option<std::task::Waker>>>,
    done: Arc<Mutex<bool>>,
    error: Arc<Mutex<Option<Box<dyn std::error::Error + Send + Sync>>>>,
    returned_fn: Option<Box<dyn FnOnce() + Send>>,
    started: Arc<Mutex<bool>>,
}

unsafe impl<T: Send> Send for Stream<T> {}

impl<T: Send + 'static> Stream<T> {
    pub fn new(returned: Option<impl FnOnce() + Send + 'static>) -> Self {
        Stream {
            queue: Arc::new(Mutex::new(VecDeque::new())),
            read_waker: Arc::new(Mutex::new(None)),
            done: Arc::new(Mutex::new(false)),
            error: Arc::new(Mutex::new(None)),
            returned_fn: returned.map(|f| Box::new(f) as Box<dyn FnOnce() + Send>),
            started: Arc::new(Mutex::new(false)),
        }
    }

    pub fn enqueue(&self, value: T) {
        let should_wake = {
            let mut queue = self.queue.lock().unwrap();
            queue.push_back(value);
            queue.len() == 1
        };

        if should_wake {
            if let Ok(waker) = self.read_waker.lock() {
                if let Some(w) = waker.as_ref() {
                    w.wake_by_ref();
                }
            }
        }
    }

    pub fn done(&self) {
        *self.done.lock().unwrap() = true;

        if let Ok(waker) = self.read_waker.lock() {
            if let Some(w) = waker.as_ref() {
                w.wake_by_ref();
            }
        }
    }

    pub fn error(&self, err: Box<dyn std::error::Error + Send + Sync>) {
        *self.error.lock().unwrap() = Some(err);

        if let Ok(waker) = self.read_waker.lock() {
            if let Some(w) = waker.as_ref() {
                w.wake_by_ref();
            }
        }
    }
}

impl<T: Send + 'static> Stream<T> {
    pub fn iter(self: Arc<Self>) -> StreamIter<T> {
        StreamIter { stream: self }
    }
}

pub struct StreamIter<T: Send + 'static> {
    stream: Arc<Stream<T>>,
}

impl<T: Send + 'static> Iterator for StreamIter<T> {
    type Item = T;

    fn next(&mut self) -> Option<Self::Item> {
        self.stream.queue.lock().unwrap().pop_front()
    }
}

impl<T> Drop for Stream<T> {
    fn drop(&mut self) {
        *self.done.lock().unwrap() = true;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn test_stream_enqueue_dequeue() {
        let stream: Arc<Stream<i32>> = Arc::new(Stream::new(None::<fn()>));

        stream.enqueue(1);
        stream.enqueue(2);

        let mut iter = stream.iter();
        assert_eq!(iter.next(), Some(1));
        assert_eq!(iter.next(), Some(2));
    }

    #[test]
    fn test_stream_done() {
        let stream = Arc::new(Stream::new(None::<fn()>));

        stream.enqueue(1);
        stream.done();

        let mut iter = stream.iter();
        assert_eq!(iter.next(), Some(1));
        assert_eq!(iter.next(), None);
    }
}