blueprint_engine_core/value/
generator.rs1use std::fmt;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
7
8use super::Value;
9
10pub struct StreamIterator {
11 rx: Mutex<mpsc::Receiver<Option<String>>>,
12 content: Mutex<String>,
13 done: Mutex<bool>,
14 result: Mutex<Option<IndexMap<String, Value>>>,
15}
16
17impl StreamIterator {
18 pub fn new(rx: mpsc::Receiver<Option<String>>) -> Self {
19 Self {
20 rx: Mutex::new(rx),
21 content: Mutex::new(String::new()),
22 done: Mutex::new(false),
23 result: Mutex::new(None),
24 }
25 }
26
27 pub async fn next(&self) -> Option<Value> {
28 let mut done = self.done.lock().await;
29 if *done {
30 return None;
31 }
32
33 let mut rx = self.rx.lock().await;
34 match rx.recv().await {
35 Some(Some(chunk)) => {
36 let mut content = self.content.lock().await;
37 content.push_str(&chunk);
38 Some(Value::String(Arc::new(chunk)))
39 }
40 Some(None) | None => {
41 *done = true;
42 None
43 }
44 }
45 }
46
47 pub async fn set_result(&self, result: IndexMap<String, Value>) {
48 let mut r = self.result.lock().await;
49 *r = Some(result);
50 }
51
52 pub fn get_attr(&self, name: &str) -> Option<Value> {
53 match name {
54 "content" => {
55 let content = self.content.try_lock().ok()?;
56 Some(Value::String(Arc::new(content.clone())))
57 }
58 "done" => {
59 let done = self.done.try_lock().ok()?;
60 Some(Value::Bool(*done))
61 }
62 "result" => {
63 let result = self.result.try_lock().ok()?;
64 match result.as_ref() {
65 Some(map) => Some(Value::Dict(Arc::new(RwLock::new(map.clone())))),
66 None => Some(Value::None),
67 }
68 }
69 _ => None,
70 }
71 }
72}
73
74pub enum GeneratorMessage {
75 Yielded(Value, oneshot::Sender<()>),
76 Complete,
77}
78
79pub struct Generator {
80 rx: Mutex<mpsc::Receiver<GeneratorMessage>>,
81 done: AtomicBool,
82 pub name: String,
83}
84
85impl Generator {
86 pub fn new(rx: mpsc::Receiver<GeneratorMessage>, name: String) -> Self {
87 Self {
88 rx: Mutex::new(rx),
89 done: AtomicBool::new(false),
90 name,
91 }
92 }
93
94 pub async fn next(&self) -> Option<Value> {
95 if self.done.load(Ordering::SeqCst) {
96 return None;
97 }
98
99 let mut rx = self.rx.lock().await;
100 match rx.recv().await {
101 Some(GeneratorMessage::Yielded(value, resume_tx)) => {
102 let _ = resume_tx.send(());
103 Some(value)
104 }
105 Some(GeneratorMessage::Complete) | None => {
106 self.done.store(true, Ordering::SeqCst);
107 None
108 }
109 }
110 }
111
112 pub fn is_done(&self) -> bool {
113 self.done.load(Ordering::SeqCst)
114 }
115}
116
117impl fmt::Debug for Generator {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 write!(f, "<generator {}>", self.name)
120 }
121}