blueprint_engine_core/value/
generator.rs

1use std::fmt;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
7
8use super::Value;
9
10pub struct StreamIterator {
11    rx: Mutex<mpsc::Receiver<Option<String>>>,
12    content: Mutex<String>,
13    done: Mutex<bool>,
14    result: Mutex<Option<IndexMap<String, Value>>>,
15}
16
17impl StreamIterator {
18    pub fn new(rx: mpsc::Receiver<Option<String>>) -> Self {
19        Self {
20            rx: Mutex::new(rx),
21            content: Mutex::new(String::new()),
22            done: Mutex::new(false),
23            result: Mutex::new(None),
24        }
25    }
26
27    pub async fn next(&self) -> Option<Value> {
28        let mut done = self.done.lock().await;
29        if *done {
30            return None;
31        }
32
33        let mut rx = self.rx.lock().await;
34        match rx.recv().await {
35            Some(Some(chunk)) => {
36                let mut content = self.content.lock().await;
37                content.push_str(&chunk);
38                Some(Value::String(Arc::new(chunk)))
39            }
40            Some(None) | None => {
41                *done = true;
42                None
43            }
44        }
45    }
46
47    pub async fn set_result(&self, result: IndexMap<String, Value>) {
48        let mut r = self.result.lock().await;
49        *r = Some(result);
50    }
51
52    pub fn get_attr(&self, name: &str) -> Option<Value> {
53        match name {
54            "content" => {
55                let content = self.content.try_lock().ok()?;
56                Some(Value::String(Arc::new(content.clone())))
57            }
58            "done" => {
59                let done = self.done.try_lock().ok()?;
60                Some(Value::Bool(*done))
61            }
62            "result" => {
63                let result = self.result.try_lock().ok()?;
64                match result.as_ref() {
65                    Some(map) => Some(Value::Dict(Arc::new(RwLock::new(map.clone())))),
66                    None => Some(Value::None),
67                }
68            }
69            _ => None,
70        }
71    }
72}
73
74pub enum GeneratorMessage {
75    Yielded(Value, oneshot::Sender<()>),
76    Complete,
77}
78
79pub struct Generator {
80    rx: Mutex<mpsc::Receiver<GeneratorMessage>>,
81    done: AtomicBool,
82    pub name: String,
83}
84
85impl Generator {
86    pub fn new(rx: mpsc::Receiver<GeneratorMessage>, name: String) -> Self {
87        Self {
88            rx: Mutex::new(rx),
89            done: AtomicBool::new(false),
90            name,
91        }
92    }
93
94    pub async fn next(&self) -> Option<Value> {
95        if self.done.load(Ordering::SeqCst) {
96            return None;
97        }
98
99        let mut rx = self.rx.lock().await;
100        match rx.recv().await {
101            Some(GeneratorMessage::Yielded(value, resume_tx)) => {
102                let _ = resume_tx.send(());
103                Some(value)
104            }
105            Some(GeneratorMessage::Complete) | None => {
106                self.done.store(true, Ordering::SeqCst);
107                None
108            }
109        }
110    }
111
112    pub fn is_done(&self) -> bool {
113        self.done.load(Ordering::SeqCst)
114    }
115}
116
117impl fmt::Debug for Generator {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        write!(f, "<generator {}>", self.name)
120    }
121}