1use std::{
8 collections::VecDeque,
9 sync::{Arc, Mutex},
10};
11
12use crate::{
13 env::Cx,
14 error::{Error, Result},
15 event::{Event, EventKind, EventSource, Tick, validate_ticks},
16 expr::Expr,
17 id::{CORE_SEQUENCE_CLASS_ID, Symbol},
18 list::ListValue,
19 object::{ClassRef, Object},
20 ref_id::Ref,
21 ref_resolver::value_from_ref,
22 stream::Stream,
23 term::Term,
24 value::Value,
25};
26
27#[derive(Clone, Debug)]
29pub struct SequenceItem {
30 value: Value,
31 ticks: Vec<Tick>,
32}
33
34impl SequenceItem {
35 pub fn new(value: Value) -> Self {
37 Self {
38 value,
39 ticks: Vec::new(),
40 }
41 }
42
43 pub fn with_ticks(value: Value, ticks: Vec<Tick>) -> Result<Self> {
45 validate_ticks(&ticks)?;
46 Ok(Self { value, ticks })
47 }
48
49 pub fn value(&self) -> &Value {
51 &self.value
52 }
53
54 pub fn ticks(&self) -> &[Tick] {
56 &self.ticks
57 }
58
59 pub fn into_value(self, cx: &mut Cx) -> Result<Value> {
61 sequence_item_value(cx, self)
62 }
63}
64
65pub trait Sequence: Send + Sync {
67 fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>>;
69
70 fn close(&self, _cx: &mut Cx) -> Result<()> {
72 Ok(())
73 }
74
75 fn peek_item(&self, _cx: &mut Cx) -> Result<Option<SequenceItem>> {
77 Err(Error::Eval("sequence does not support peek".to_owned()))
78 }
79
80 fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
82 Ok(false)
83 }
84}
85
86pub fn seq_next(cx: &mut Cx, sequence: &dyn Sequence) -> Result<Option<SequenceItem>> {
88 sequence.next_item(cx)
89}
90
91pub fn seq_close(cx: &mut Cx, sequence: &dyn Sequence) -> Result<()> {
93 sequence.close(cx)
94}
95
96pub fn seq_peek(cx: &mut Cx, sequence: &dyn Sequence) -> Result<Option<SequenceItem>> {
98 sequence.peek_item(cx)
99}
100
101pub fn seq_is_done(cx: &mut Cx, sequence: &dyn Sequence) -> Result<bool> {
103 sequence.is_done(cx)
104}
105
106pub fn seq_next_value(cx: &mut Cx, target: &Value) -> Result<Option<SequenceItem>> {
108 if let Some(sequence) = target.object().as_sequence() {
109 return seq_next(cx, sequence);
110 }
111 if let Some(stream) = target.object().as_stream() {
112 return stream.next(cx).map(|item| item.map(SequenceItem::new));
113 }
114 Err(Error::TypeMismatch {
115 expected: "sequence",
116 found: "non-sequence",
117 })
118}
119
120pub fn seq_close_value(cx: &mut Cx, target: &Value) -> Result<()> {
122 if let Some(sequence) = target.object().as_sequence() {
123 return seq_close(cx, sequence);
124 }
125 if let Some(stream) = target.object().as_stream() {
126 return stream.close(cx);
127 }
128 Err(Error::TypeMismatch {
129 expected: "sequence",
130 found: "non-sequence",
131 })
132}
133
134pub fn sequence_item_value(cx: &mut Cx, item: SequenceItem) -> Result<Value> {
137 if item.ticks.is_empty() {
138 return Ok(item.value);
139 }
140
141 let ticks = item
142 .ticks
143 .into_iter()
144 .map(|tick| tick_value(cx, tick))
145 .collect::<Result<Vec<_>>>()?;
146 let ticks = cx.factory().list(ticks)?;
147 cx.factory().table(vec![
148 (Symbol::new("payload"), item.value),
149 (Symbol::new("ticks"), ticks),
150 ])
151}
152
153pub fn sequence_item_from_event(cx: &mut Cx, event: Event) -> Result<Option<SequenceItem>> {
156 match event.kind {
157 EventKind::Chunk { payload } | EventKind::Final(payload) => {
158 SequenceItem::with_ticks(value_from_ref(cx, &payload)?, event.ticks).map(Some)
159 }
160 EventKind::Failed(error) => Err(error_from_failed_ref(cx, &error)),
161 EventKind::Started { .. }
162 | EventKind::Claim { .. }
163 | EventKind::Diagnostic(_)
164 | EventKind::Trace(_)
165 | EventKind::EffectRequested { .. }
166 | EventKind::EffectResolved { .. }
167 | EventKind::Capture { .. }
168 | EventKind::Card { .. }
169 | EventKind::Done => Ok(None),
170 }
171}
172
173pub struct EventSourceSequence {
176 source: Arc<dyn EventSource>,
177 state: Mutex<EventSourceSequenceState>,
178}
179
180#[derive(Debug, Default)]
181struct EventSourceSequenceState {
182 pending: VecDeque<Event>,
183 done: bool,
184}
185
186impl EventSourceSequence {
187 pub fn new(source: Arc<dyn EventSource>) -> Self {
189 Self {
190 source,
191 state: Mutex::new(EventSourceSequenceState::default()),
192 }
193 }
194}
195
196impl Object for EventSourceSequence {
197 fn display(&self, _cx: &mut Cx) -> Result<String> {
198 Ok("#<event-source-sequence>".to_owned())
199 }
200
201 fn as_any(&self) -> &dyn std::any::Any {
202 self
203 }
204}
205
206impl crate::ObjectCompat for EventSourceSequence {
207 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
208 cx.factory().class_stub(
209 CORE_SEQUENCE_CLASS_ID,
210 Symbol::qualified("core", "EventSourceSequence"),
211 )
212 }
213 fn as_sequence(&self) -> Option<&dyn Sequence> {
214 Some(self)
215 }
216}
217
218impl Sequence for EventSourceSequence {
219 fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
220 while let Some(event) = self.next_event(cx)? {
221 let done = matches!(event.kind, EventKind::Done);
222 if let Some(item) = sequence_item_from_event(cx, event)? {
223 return Ok(Some(item));
224 }
225 if done {
226 self.mark_done()?;
227 return Ok(None);
228 }
229 }
230 Ok(None)
231 }
232
233 fn close(&self, cx: &mut Cx) -> Result<()> {
234 {
235 let mut state = self
236 .state
237 .lock()
238 .map_err(|_| Error::PoisonedLock("event source sequence"))?;
239 state.pending.clear();
240 state.done = true;
241 }
242 self.source.close(cx)
243 }
244
245 fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
246 loop {
247 if let Some(item) = self.pending_item(cx)? {
248 return Ok(Some(item));
249 }
250 if self.is_done(cx)? {
251 return Ok(None);
252 }
253 let Some(event) = self.source.next(cx)? else {
254 self.mark_done()?;
255 return Ok(None);
256 };
257 let done = matches!(event.kind, EventKind::Done);
258 self.push_pending(event)?;
259 if done {
260 self.mark_done()?;
261 }
262 }
263 }
264
265 fn is_done(&self, _cx: &mut Cx) -> Result<bool> {
266 let state = self
267 .state
268 .lock()
269 .map_err(|_| Error::PoisonedLock("event source sequence"))?;
270 Ok(state.done && state.pending.is_empty())
271 }
272}
273
274impl EventSourceSequence {
275 fn next_event(&self, cx: &mut Cx) -> Result<Option<Event>> {
276 {
277 let mut state = self
278 .state
279 .lock()
280 .map_err(|_| Error::PoisonedLock("event source sequence"))?;
281 if let Some(event) = state.pending.pop_front() {
282 return Ok(Some(event));
283 }
284 if state.done {
285 return Ok(None);
286 }
287 }
288
289 match self.source.next(cx)? {
290 Some(event) => Ok(Some(event)),
291 None => {
292 self.mark_done()?;
293 Ok(None)
294 }
295 }
296 }
297
298 fn pending_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
299 let events = self
300 .state
301 .lock()
302 .map_err(|_| Error::PoisonedLock("event source sequence"))?
303 .pending
304 .iter()
305 .cloned()
306 .collect::<Vec<_>>();
307 for event in events {
308 if let Some(item) = sequence_item_from_event(cx, event)? {
309 return Ok(Some(item));
310 }
311 }
312 Ok(None)
313 }
314
315 fn push_pending(&self, event: Event) -> Result<()> {
316 let mut state = self
317 .state
318 .lock()
319 .map_err(|_| Error::PoisonedLock("event source sequence"))?;
320 state.pending.push_back(event);
321 Ok(())
322 }
323
324 fn mark_done(&self) -> Result<()> {
325 let mut state = self
326 .state
327 .lock()
328 .map_err(|_| Error::PoisonedLock("event source sequence"))?;
329 state.done = true;
330 Ok(())
331 }
332}
333
334#[derive(Debug)]
337pub struct ListSequence {
338 state: Mutex<ListSequenceState>,
339}
340
341#[derive(Debug)]
342struct ListSequenceState {
343 next: Option<Value>,
344 closed: bool,
345}
346
347impl ListSequence {
348 pub fn new(list: Value) -> Self {
350 Self {
351 state: Mutex::new(ListSequenceState {
352 next: Some(list),
353 closed: false,
354 }),
355 }
356 }
357}
358
359impl Object for ListSequence {
360 fn display(&self, _cx: &mut Cx) -> Result<String> {
361 Ok("#<list-sequence>".to_owned())
362 }
363
364 fn as_any(&self) -> &dyn std::any::Any {
365 self
366 }
367}
368
369impl crate::ObjectCompat for ListSequence {
370 fn class(&self, cx: &mut Cx) -> Result<ClassRef> {
371 cx.factory().class_stub(
372 CORE_SEQUENCE_CLASS_ID,
373 Symbol::qualified("core", "Sequence"),
374 )
375 }
376 fn as_sequence(&self) -> Option<&dyn Sequence> {
377 Some(self)
378 }
379}
380
381impl Sequence for ListSequence {
382 fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
383 let mut state = self
384 .state
385 .lock()
386 .map_err(|_| Error::PoisonedLock("list sequence"))?;
387 if state.closed {
388 return Ok(None);
389 }
390
391 let Some(node) = state.next.clone() else {
392 state.closed = true;
393 return Ok(None);
394 };
395 let list = list_from_value(&node)?;
396 if list.is_empty(cx)? {
397 state.next = None;
398 state.closed = true;
399 return Ok(None);
400 }
401
402 let value = list
403 .car(cx)?
404 .ok_or_else(|| Error::Eval("list car missing for non-empty list".to_owned()))?;
405 state.next = list.cdr(cx)?;
406 Ok(Some(SequenceItem::new(value)))
407 }
408
409 fn close(&self, _cx: &mut Cx) -> Result<()> {
410 let mut state = self
411 .state
412 .lock()
413 .map_err(|_| Error::PoisonedLock("list sequence"))?;
414 state.next = None;
415 state.closed = true;
416 Ok(())
417 }
418
419 fn peek_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
420 let mut state = self
421 .state
422 .lock()
423 .map_err(|_| Error::PoisonedLock("list sequence"))?;
424 if state.closed {
425 return Ok(None);
426 }
427
428 let Some(node) = state.next.as_ref() else {
429 state.closed = true;
430 return Ok(None);
431 };
432 let list = list_from_value(node)?;
433 if list.is_empty(cx)? {
434 state.next = None;
435 state.closed = true;
436 return Ok(None);
437 }
438 list.car(cx).map(|item| item.map(SequenceItem::new))
439 }
440
441 fn is_done(&self, cx: &mut Cx) -> Result<bool> {
442 let mut state = self
443 .state
444 .lock()
445 .map_err(|_| Error::PoisonedLock("list sequence"))?;
446 if state.closed {
447 return Ok(true);
448 }
449
450 let Some(node) = state.next.as_ref() else {
451 state.closed = true;
452 return Ok(true);
453 };
454 let done = list_from_value(node)?.is_empty(cx)?;
455 if done {
456 state.next = None;
457 state.closed = true;
458 }
459 Ok(done)
460 }
461}
462
463impl<T> Sequence for T
464where
465 T: Stream + ?Sized,
466{
467 fn next_item(&self, cx: &mut Cx) -> Result<Option<SequenceItem>> {
468 self.next(cx).map(|item| item.map(SequenceItem::new))
469 }
470
471 fn close(&self, cx: &mut Cx) -> Result<()> {
472 Stream::close(self, cx)
473 }
474}
475
476fn list_from_value(value: &Value) -> Result<&dyn ListValue> {
477 value.object().as_list().ok_or(Error::TypeMismatch {
478 expected: "list",
479 found: "non-list",
480 })
481}
482
483fn tick_value(cx: &mut Cx, tick: Tick) -> Result<Value> {
484 let clock = cx.factory().symbol(tick.clock)?;
485 let index = ref_value(cx, tick.index)?;
486 cx.factory().table(vec![
487 (Symbol::new("clock"), clock),
488 (Symbol::new("index"), index),
489 ])
490}
491
492fn ref_value(cx: &mut Cx, reference: Ref) -> Result<Value> {
493 cx.factory().expr(Expr::from(Term::Ref(reference)))
494}
495
496fn error_from_failed_ref(cx: &mut Cx, reference: &Ref) -> Error {
497 match value_from_ref(cx, reference) {
498 Ok(value) => match value.object().display(cx) {
499 Ok(message) => Error::Eval(message),
500 Err(err) => err,
501 },
502 Err(err) => err,
503 }
504}