1use std::fmt;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex, mpsc};
7
8#[cfg(feature = "native")]
9use tl_ai::{TlModel, TlTensor};
10#[cfg(feature = "native")]
11use tl_data::{ArrowSchema, DataFrame};
12#[cfg(feature = "gpu")]
13use tl_gpu::GpuTensor;
14#[cfg(feature = "native")]
15use tl_stream::{AgentDef, ConnectorConfig, PipelineDef, PipelineResult, StreamDef};
16
17use crate::chunk::{BuiltinId, Prototype};
18
19#[derive(Clone)]
21pub enum VmValue {
22 Int(i64),
23 Float(f64),
24 String(Arc<str>),
25 Bool(bool),
26 None,
27 List(Box<Vec<VmValue>>),
28 Function(Arc<VmClosure>),
30 Builtin(BuiltinId),
32 #[cfg(feature = "native")]
34 Table(VmTable),
35 #[cfg(feature = "native")]
37 Schema(VmSchema),
38 #[cfg(feature = "native")]
40 Tensor(Arc<TlTensor>),
41 #[cfg(feature = "native")]
43 Model(Arc<TlModel>),
44 #[cfg(feature = "native")]
46 Connector(Arc<ConnectorConfig>),
47 #[cfg(feature = "native")]
49 PipelineResult(Arc<PipelineResult>),
50 #[cfg(feature = "native")]
52 PipelineDef(Arc<PipelineDef>),
53 #[cfg(feature = "native")]
55 StreamDef(Arc<StreamDef>),
56 StructDef(Arc<VmStructDef>),
58 StructInstance(Arc<VmStructInstance>),
60 EnumDef(Arc<VmEnumDef>),
62 EnumInstance(Arc<VmEnumInstance>),
64 Module(Arc<VmModule>),
66 Map(Box<Vec<(Arc<str>, VmValue)>>),
68 Task(Arc<VmTask>),
70 Channel(Arc<VmChannel>),
72 Generator(Arc<Mutex<VmGenerator>>),
74 Set(Box<Vec<VmValue>>),
76 Decimal(rust_decimal::Decimal),
78 DateTime(i64),
80 Secret(Arc<str>),
82 #[cfg(feature = "python")]
84 PyObject(Arc<crate::python::PyObjectWrapper>),
85 #[cfg(feature = "gpu")]
87 GpuTensor(Arc<GpuTensor>),
88 #[cfg(feature = "native")]
90 AgentDef(Arc<AgentDef>),
91 #[cfg(feature = "mcp")]
93 McpClient(Arc<tl_mcp::McpClient>),
94 Moved,
96 Ref(Arc<VmValue>),
98}
99
100impl PartialEq for VmValue {
101 fn eq(&self, other: &Self) -> bool {
102 match (self, other) {
103 (VmValue::Int(a), VmValue::Int(b)) => a == b,
104 (VmValue::Float(a), VmValue::Float(b)) => a == b,
105 (VmValue::String(a), VmValue::String(b)) => a == b,
106 (VmValue::Bool(a), VmValue::Bool(b)) => a == b,
107 (VmValue::None, VmValue::None) => true,
108 (VmValue::Decimal(a), VmValue::Decimal(b)) => a == b,
109 (VmValue::DateTime(a), VmValue::DateTime(b)) => a == b,
110 (VmValue::DateTime(a), VmValue::Int(b)) | (VmValue::Int(a), VmValue::DateTime(b)) => {
111 a == b
112 }
113 (VmValue::List(a), VmValue::List(b)) => a == b,
114 (VmValue::Map(a), VmValue::Map(b)) => a == b,
115 (VmValue::Set(a), VmValue::Set(b)) => a == b,
116 (VmValue::Ref(a), VmValue::Ref(b)) => a == b,
117 (VmValue::Ref(inner), other) | (other, VmValue::Ref(inner)) => inner.as_ref() == other,
118 _ => false,
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct VmStructDef {
126 pub name: Arc<str>,
127 pub fields: Vec<Arc<str>>,
128}
129
130#[derive(Debug, Clone)]
132pub struct VmStructInstance {
133 pub type_name: Arc<str>,
134 pub fields: Vec<(Arc<str>, VmValue)>,
135}
136
137#[derive(Debug, Clone)]
139pub struct VmEnumDef {
140 pub name: Arc<str>,
141 pub variants: Vec<(Arc<str>, usize)>, }
143
144#[derive(Debug, Clone)]
146pub struct VmEnumInstance {
147 pub type_name: Arc<str>,
148 pub variant: Arc<str>,
149 pub fields: Vec<VmValue>,
150}
151
152#[derive(Debug, Clone)]
154pub struct VmModule {
155 pub name: Arc<str>,
156 pub exports: std::collections::HashMap<String, VmValue>,
157}
158
159static TASK_COUNTER: AtomicU64 = AtomicU64::new(1);
161static CHANNEL_COUNTER: AtomicU64 = AtomicU64::new(1);
163
164pub struct VmTask {
171 pub receiver: Mutex<Option<mpsc::Receiver<Result<VmValue, String>>>>,
172 pub id: u64,
173}
174
175impl VmTask {
176 pub fn new(receiver: mpsc::Receiver<Result<VmValue, String>>) -> Self {
177 VmTask {
178 receiver: Mutex::new(Some(receiver)),
179 id: TASK_COUNTER.fetch_add(1, Ordering::Relaxed),
180 }
181 }
182}
183
184impl fmt::Debug for VmTask {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 write!(f, "<task {}>", self.id)
187 }
188}
189
190pub struct VmChannel {
192 pub sender: mpsc::SyncSender<VmValue>,
193 pub receiver: Arc<Mutex<mpsc::Receiver<VmValue>>>,
194 pub id: u64,
195}
196
197impl VmChannel {
198 pub fn new(capacity: usize) -> Self {
199 let (tx, rx) = mpsc::sync_channel(capacity);
200 VmChannel {
201 sender: tx,
202 receiver: Arc::new(Mutex::new(rx)),
203 id: CHANNEL_COUNTER.fetch_add(1, Ordering::Relaxed),
204 }
205 }
206}
207
208impl Clone for VmChannel {
209 fn clone(&self) -> Self {
210 VmChannel {
211 sender: self.sender.clone(),
212 receiver: self.receiver.clone(),
213 id: self.id,
214 }
215 }
216}
217
218impl fmt::Debug for VmChannel {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 write!(f, "<channel {}>", self.id)
221 }
222}
223
224static GENERATOR_COUNTER: AtomicU64 = AtomicU64::new(1);
226
227pub enum GeneratorKind {
229 UserDefined {
231 prototype: Arc<Prototype>,
232 upvalues: Vec<UpvalueRef>,
233 saved_stack: Vec<VmValue>,
234 ip: usize,
235 },
236 ListIter { items: Vec<VmValue>, index: usize },
238 Take {
240 source: Arc<Mutex<VmGenerator>>,
241 remaining: usize,
242 },
243 Skip {
245 source: Arc<Mutex<VmGenerator>>,
246 remaining: usize,
247 },
248 Map {
250 source: Arc<Mutex<VmGenerator>>,
251 func: VmValue,
252 },
253 Filter {
255 source: Arc<Mutex<VmGenerator>>,
256 func: VmValue,
257 },
258 Chain {
260 first: Arc<Mutex<VmGenerator>>,
261 second: Arc<Mutex<VmGenerator>>,
262 on_second: bool,
263 },
264 Zip {
266 first: Arc<Mutex<VmGenerator>>,
267 second: Arc<Mutex<VmGenerator>>,
268 },
269 Enumerate {
271 source: Arc<Mutex<VmGenerator>>,
272 index: usize,
273 },
274}
275
276impl fmt::Debug for GeneratorKind {
277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278 match self {
279 GeneratorKind::UserDefined { .. } => write!(f, "UserDefined"),
280 GeneratorKind::ListIter { .. } => write!(f, "ListIter"),
281 GeneratorKind::Take { .. } => write!(f, "Take"),
282 GeneratorKind::Skip { .. } => write!(f, "Skip"),
283 GeneratorKind::Map { .. } => write!(f, "Map"),
284 GeneratorKind::Filter { .. } => write!(f, "Filter"),
285 GeneratorKind::Chain { .. } => write!(f, "Chain"),
286 GeneratorKind::Zip { .. } => write!(f, "Zip"),
287 GeneratorKind::Enumerate { .. } => write!(f, "Enumerate"),
288 }
289 }
290}
291
292pub struct VmGenerator {
302 pub kind: GeneratorKind,
303 pub done: bool,
304 pub id: u64,
305}
306
307impl VmGenerator {
308 pub fn new(kind: GeneratorKind) -> Self {
309 VmGenerator {
310 kind,
311 done: false,
312 id: GENERATOR_COUNTER.fetch_add(1, Ordering::Relaxed),
313 }
314 }
315}
316
317impl fmt::Debug for VmGenerator {
318 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319 write!(f, "<generator {}>", self.id)
320 }
321}
322
323#[derive(Debug)]
325pub struct VmClosure {
326 pub prototype: Arc<Prototype>,
327 pub upvalues: Vec<UpvalueRef>,
328}
329
330#[derive(Debug, Clone)]
332pub enum UpvalueRef {
333 Open { stack_index: usize },
335 Closed(VmValue),
337}
338
339#[cfg(feature = "native")]
341#[derive(Clone)]
342pub struct VmTable {
343 pub df: DataFrame,
344}
345
346#[cfg(feature = "native")]
347impl fmt::Debug for VmTable {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 write!(f, "<table>")
350 }
351}
352
353#[cfg(feature = "native")]
355#[derive(Debug, Clone)]
356pub struct VmSchema {
357 pub name: Arc<str>,
358 pub version: i64,
359 pub arrow_schema: Arc<ArrowSchema>,
360}
361
362impl VmValue {
363 pub fn is_truthy(&self) -> bool {
364 match self {
365 VmValue::Bool(b) => *b,
366 VmValue::Int(n) => *n != 0,
367 VmValue::Float(n) => *n != 0.0,
368 VmValue::String(s) => !s.is_empty(),
369 VmValue::List(items) => !items.is_empty(),
370 VmValue::Map(pairs) => !pairs.is_empty(),
371 VmValue::Set(items) => !items.is_empty(),
372 VmValue::None => false,
373 VmValue::Decimal(d) => !d.is_zero(),
374 #[cfg(feature = "python")]
375 VmValue::PyObject(_) => true,
376 #[cfg(feature = "gpu")]
377 VmValue::GpuTensor(_) => true,
378 VmValue::Moved => false,
379 VmValue::Ref(inner) => inner.is_truthy(),
380 _ => true,
381 }
382 }
383
384 pub fn type_name(&self) -> &'static str {
385 match self {
386 VmValue::Int(_) => "int64",
387 VmValue::Float(_) => "float64",
388 VmValue::String(_) => "string",
389 VmValue::Bool(_) => "bool",
390 VmValue::List(_) => "list",
391 VmValue::None => "none",
392 VmValue::Function(_) => "function",
393 VmValue::Builtin(_) => "builtin",
394 #[cfg(feature = "native")]
395 VmValue::Table(_) => "table",
396 #[cfg(feature = "native")]
397 VmValue::Schema(_) => "schema",
398 #[cfg(feature = "native")]
399 VmValue::Tensor(_) => "tensor",
400 #[cfg(feature = "native")]
401 VmValue::Model(_) => "model",
402 #[cfg(feature = "native")]
403 VmValue::Connector(_) => "connector",
404 #[cfg(feature = "native")]
405 VmValue::PipelineResult(_) => "pipeline_result",
406 #[cfg(feature = "native")]
407 VmValue::PipelineDef(_) => "pipeline",
408 #[cfg(feature = "native")]
409 VmValue::StreamDef(_) => "stream",
410 VmValue::StructDef(_) => "struct_def",
411 VmValue::StructInstance(_) => "struct",
412 VmValue::EnumDef(_) => "enum_def",
413 VmValue::EnumInstance(_) => "enum",
414 VmValue::Module(_) => "module",
415 VmValue::Map(_) => "map",
416 VmValue::Set(_) => "set",
417 VmValue::Task(_) => "task",
418 VmValue::Channel(_) => "channel",
419 VmValue::Generator(_) => "generator",
420 VmValue::Decimal(_) => "decimal",
421 VmValue::DateTime(_) => "datetime",
422 VmValue::Secret(_) => "secret",
423 #[cfg(feature = "python")]
424 VmValue::PyObject(_) => "pyobject",
425 #[cfg(feature = "gpu")]
426 VmValue::GpuTensor(_) => "gpu_tensor",
427 #[cfg(feature = "native")]
428 VmValue::AgentDef(_) => "agent",
429 #[cfg(feature = "mcp")]
430 VmValue::McpClient(_) => "mcp_client",
431 VmValue::Moved => "<moved>",
432 VmValue::Ref(inner) => inner.type_name(),
433 }
434 }
435}
436
437impl fmt::Debug for VmValue {
438 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439 match self {
440 VmValue::Int(n) => write!(f, "Int({n})"),
441 VmValue::Float(n) => write!(f, "Float({n})"),
442 VmValue::String(s) => write!(f, "String({s:?})"),
443 VmValue::Bool(b) => write!(f, "Bool({b})"),
444 VmValue::None => write!(f, "None"),
445 VmValue::List(items) => write!(f, "List({items:?})"),
446 VmValue::Function(c) => write!(f, "<fn {}>", c.prototype.name),
447 VmValue::Builtin(id) => write!(f, "<builtin {}>", id.name()),
448 #[cfg(feature = "native")]
449 VmValue::Table(_) => write!(f, "<table>"),
450 #[cfg(feature = "native")]
451 VmValue::Schema(s) => write!(f, "<schema {}>", s.name),
452 #[cfg(feature = "native")]
453 VmValue::Tensor(t) => write!(f, "Tensor({t:?})"),
454 #[cfg(feature = "native")]
455 VmValue::Model(m) => write!(f, "Model({m:?})"),
456 #[cfg(feature = "native")]
457 VmValue::Connector(c) => write!(f, "<connector {}>", c.name),
458 #[cfg(feature = "native")]
459 VmValue::PipelineResult(r) => write!(f, "{r:?}"),
460 #[cfg(feature = "native")]
461 VmValue::PipelineDef(p) => write!(f, "<pipeline {}>", p.name),
462 #[cfg(feature = "native")]
463 VmValue::StreamDef(s) => write!(f, "<stream {}>", s.name),
464 VmValue::StructDef(d) => write!(f, "<struct {}>", d.name),
465 VmValue::StructInstance(s) => {
466 write!(f, "{} {{ ", s.type_name)?;
467 for (i, (k, v)) in s.fields.iter().enumerate() {
468 if i > 0 {
469 write!(f, ", ")?;
470 }
471 write!(f, "{k}: {v:?}")?;
472 }
473 write!(f, " }}")
474 }
475 VmValue::EnumDef(d) => write!(f, "<enum {}>", d.name),
476 VmValue::EnumInstance(e) => {
477 write!(f, "{}::{}", e.type_name, e.variant)?;
478 if !e.fields.is_empty() {
479 write!(f, "({:?})", e.fields)?;
480 }
481 Ok(())
482 }
483 VmValue::Module(m) => write!(f, "<module {}>", m.name),
484 VmValue::Map(pairs) => {
485 write!(f, "Map{{")?;
486 for (i, (k, v)) in pairs.iter().enumerate() {
487 if i > 0 {
488 write!(f, ", ")?;
489 }
490 write!(f, "{k:?}: {v:?}")?;
491 }
492 write!(f, "}}")
493 }
494 VmValue::Task(t) => write!(f, "<task {}>", t.id),
495 VmValue::Channel(c) => write!(f, "<channel {}>", c.id),
496 VmValue::Generator(g) => {
497 let guard = g.lock().unwrap();
498 write!(f, "<generator {}>", guard.id)
499 }
500 VmValue::Set(items) => {
501 write!(f, "Set{{")?;
502 for (i, v) in items.iter().enumerate() {
503 if i > 0 {
504 write!(f, ", ")?;
505 }
506 write!(f, "{v:?}")?;
507 }
508 write!(f, "}}")
509 }
510 VmValue::Decimal(d) => write!(f, "Decimal({d})"),
511 VmValue::DateTime(ms) => {
512 use chrono::TimeZone;
513 let secs = *ms / 1000;
514 let nsecs = ((*ms % 1000) * 1_000_000) as u32;
515 match chrono::Utc.timestamp_opt(secs, nsecs).single() {
516 Some(dt) => write!(f, "DateTime({})", dt.format("%Y-%m-%d %H:%M:%S UTC")),
517 None => write!(f, "DateTime({ms}ms)"),
518 }
519 }
520 VmValue::Secret(_) => write!(f, "Secret(***)"),
521 #[cfg(feature = "python")]
522 VmValue::PyObject(w) => write!(f, "PyObject({w:?})"),
523 #[cfg(feature = "gpu")]
524 VmValue::GpuTensor(t) => write!(f, "{t:?}"),
525 #[cfg(feature = "native")]
526 VmValue::AgentDef(a) => write!(f, "AgentDef({})", a.name),
527 #[cfg(feature = "mcp")]
528 VmValue::McpClient(_) => write!(f, "<mcp_client>"),
529 VmValue::Moved => write!(f, "<moved>"),
530 VmValue::Ref(inner) => write!(f, "&{inner:?}"),
531 }
532 }
533}
534
535impl fmt::Display for VmValue {
536 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537 match self {
538 VmValue::Int(n) => write!(f, "{n}"),
539 VmValue::Float(n) => {
540 if n.fract() == 0.0 {
541 write!(f, "{n:.1}")
542 } else {
543 write!(f, "{n}")
544 }
545 }
546 VmValue::String(s) => write!(f, "{s}"),
547 VmValue::Bool(b) => write!(f, "{b}"),
548 VmValue::None => write!(f, "none"),
549 VmValue::List(items) => {
550 write!(f, "[")?;
551 for (i, item) in items.iter().enumerate() {
552 if i > 0 {
553 write!(f, ", ")?;
554 }
555 write!(f, "{item}")?;
556 }
557 write!(f, "]")
558 }
559 VmValue::Function(c) => write!(f, "<fn {}>", c.prototype.name),
560 VmValue::Builtin(id) => write!(f, "<builtin {}>", id.name()),
561 #[cfg(feature = "native")]
562 VmValue::Table(_) => write!(f, "<table>"),
563 #[cfg(feature = "native")]
564 VmValue::Schema(s) => write!(f, "<schema {}>", s.name),
565 #[cfg(feature = "native")]
566 VmValue::Tensor(t) => write!(f, "{t}"),
567 #[cfg(feature = "native")]
568 VmValue::Model(m) => write!(f, "{m}"),
569 #[cfg(feature = "native")]
570 VmValue::Connector(c) => write!(f, "{c}"),
571 #[cfg(feature = "native")]
572 VmValue::PipelineResult(r) => write!(f, "{r}"),
573 #[cfg(feature = "native")]
574 VmValue::PipelineDef(p) => write!(f, "{p}"),
575 #[cfg(feature = "native")]
576 VmValue::StreamDef(s) => write!(f, "{s}"),
577 VmValue::StructDef(d) => write!(f, "<struct {}>", d.name),
578 VmValue::StructInstance(s) => {
579 write!(f, "{} {{ ", s.type_name)?;
580 for (i, (k, v)) in s.fields.iter().enumerate() {
581 if i > 0 {
582 write!(f, ", ")?;
583 }
584 write!(f, "{k}: {v}")?;
585 }
586 write!(f, " }}")
587 }
588 VmValue::EnumDef(d) => write!(f, "<enum {}>", d.name),
589 VmValue::EnumInstance(e) => {
590 write!(f, "{}::{}", e.type_name, e.variant)?;
591 if !e.fields.is_empty() {
592 write!(f, "(")?;
593 for (i, v) in e.fields.iter().enumerate() {
594 if i > 0 {
595 write!(f, ", ")?;
596 }
597 write!(f, "{v}")?;
598 }
599 write!(f, ")")?;
600 }
601 Ok(())
602 }
603 VmValue::Module(m) => write!(f, "<module {}>", m.name),
604 VmValue::Map(pairs) => {
605 write!(f, "{{")?;
606 for (i, (k, v)) in pairs.iter().enumerate() {
607 if i > 0 {
608 write!(f, ", ")?;
609 }
610 write!(f, "{k}: {v}")?;
611 }
612 write!(f, "}}")
613 }
614 VmValue::Task(t) => write!(f, "<task {}>", t.id),
615 VmValue::Channel(c) => write!(f, "<channel {}>", c.id),
616 VmValue::Generator(g) => {
617 let guard = g.lock().unwrap();
618 write!(f, "<generator {}>", guard.id)
619 }
620 VmValue::Set(items) => {
621 write!(f, "{{")?;
622 for (i, v) in items.iter().enumerate() {
623 if i > 0 {
624 write!(f, ", ")?;
625 }
626 write!(f, "{v}")?;
627 }
628 write!(f, "}}")
629 }
630 VmValue::Decimal(d) => write!(f, "{d}"),
631 VmValue::DateTime(ms) => {
632 use chrono::TimeZone;
633 let secs = *ms / 1000;
634 let nsecs = ((*ms % 1000) * 1_000_000) as u32;
635 match chrono::Utc.timestamp_opt(secs, nsecs).single() {
636 Some(dt) => write!(f, "{}", dt.format("%Y-%m-%d %H:%M:%S")),
637 None => write!(f, "<datetime {ms}ms>"),
638 }
639 }
640 VmValue::Secret(_) => write!(f, "***"),
641 #[cfg(feature = "python")]
642 VmValue::PyObject(w) => write!(f, "{w}"),
643 #[cfg(feature = "gpu")]
644 VmValue::GpuTensor(t) => write!(f, "{t}"),
645 #[cfg(feature = "native")]
646 VmValue::AgentDef(a) => write!(f, "<agent {}>", a.name),
647 #[cfg(feature = "mcp")]
648 VmValue::McpClient(_) => write!(f, "<mcp_client>"),
649 VmValue::Moved => write!(f, "<moved>"),
650 VmValue::Ref(inner) => write!(f, "{inner}"),
651 }
652 }
653}