mlua-swarm 0.1.3

Swarm engine host built on mlua — long-running stateful runtime with Role/Verb gate, CapToken, 3-stage pipeline, and Middleware overlay.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
//! Blueprint `Compiler`, `CompiledAgentTable`, and the three default
//! `SpawnerFactory` implementations.
//!
//! ## Pipeline
//!
//! ```text
//! Blueprint (= flow + agents + hints + strategy + spawner_hints)
//!//!     │ Compiler.compile(&bp)          ← this module (AgentDef → SpawnerAdapter table)
//!//! CompiledBlueprint {
//!     router: Arc<CompiledAgentTable>, // ctx.agent → SpawnerAdapter lookup
//!     flow:   FlowNode,                // the flow.ir source (evaluated via EngineDispatcher)
//!     metadata: BlueprintMetadata,
//! }
//!//!     │ service::linker::link(router, blueprint.spawner_hints.layers, &engine)
//!     ▼                                   ↑ Layer wrapping is done separately (src/service/linker.rs)
//! `Arc<dyn SpawnerAdapter>`            (already wrapped with base + hint SpawnerLayers)
//!//!     ▼ EngineDispatcher::with_spawner → engine.dispatch_attempt_with
//! ```
//!
//! `CompiledAgentTable` is a thin table: it looks up `routes[name]` by
//! `ctx.agent` and hands the spawn off to the matching `SpawnerAdapter`.
//! The `routes` map is built at compile time through `SpawnerFactory`
//! implementations. Layer wrapping is not part of this module — it lives
//! in `service::linker::link`.

use crate::blueprint::{AgentDef, AgentKind, Blueprint, BlueprintMetadata};
use crate::core::ctx::Ctx;
use crate::core::engine::Engine;
use crate::operator::{Operator, OperatorSpawner};
use crate::types::{CapToken, TaskId};
use crate::worker::adapter::{InProcSpawner, SpawnError, SpawnerAdapter, WorkerFn};
use crate::worker::process_spawner::{ProcessSpawner, StreamMode};
use crate::worker::Worker;
use async_trait::async_trait;
use mlua_flow_ir::Node as FlowNode;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;

// ─── error ───────────────────────────────────────────────────────────────

/// Everything that can go wrong while `Compiler::compile` turns a
/// `Blueprint` into a `CompiledBlueprint`.
#[derive(Debug, Error)]
pub enum CompileError {
    /// An `AgentDef.kind` has no matching entry in the `SpawnerRegistry`
    /// and `Blueprint.strategy.strict_kind` is set.
    #[error("unknown agent kind in SpawnerRegistry: {0:?}")]
    UnknownKind(AgentKind),
    /// The `AgentDef.spec` shape did not match what the factory for its
    /// kind requires (missing/mistyped field, etc.).
    #[error("agent '{name}' spec invalid: {msg}")]
    InvalidSpec {
        /// The offending agent's name.
        name: String,
        /// Human-readable description of what was wrong with the spec.
        msg: String,
    },
    /// The flow references an agent name that has no corresponding
    /// `AgentDef` (and no default spawner is configured).
    #[error("flow references agent '{0}' but no AgentDef matches")]
    UnresolvedRef(String),
    /// Two `AgentDef`s in the same `Blueprint` share a name.
    #[error("duplicate AgentDef name: {0}")]
    DuplicateAgent(String),
    /// A `kind = Operator` agent's `spec.operator_ref` does not match
    /// any `OperatorDef.name` declared in `Blueprint.operators`.
    #[error("agent '{agent}' operator_ref '{op_ref}' does not match any OperatorDef.name in Blueprint.operators (defined: {defined:?})")]
    UnresolvedOperatorRef {
        /// The agent whose `operator_ref` didn't resolve.
        agent: String,
        /// The `operator_ref` value that was looked up.
        op_ref: String,
        /// The `OperatorDef.name`s that *are* declared, for the error
        /// message.
        defined: Vec<String>,
    },
}

// ─── SpawnerFactory + Registry ───────────────────────────────────────────

/// Factory trait that interprets an `AgentDef` and builds the concrete
/// `SpawnerAdapter`. Register one per kind. Parsing the spec,
/// validating it, and baking the profile are the implementation's job.
///
/// The signature was widened in v9 from `(name, spec, hint)` to
/// `(&AgentDef, hint)` so the profile can be passed through. Most
/// implementations still just pull `&agent_def.name` and
/// `&agent_def.spec`, but Operator-backend factories consume
/// `agent_def.profile` to bake the persona in.
pub trait SpawnerFactory: Send + Sync {
    /// Build the concrete `SpawnerAdapter` for one `AgentDef`. `hint` is
    /// the matching entry (if any) from `Blueprint.hints.per_agent`.
    fn build(
        &self,
        agent_def: &AgentDef,
        hint: Option<&Value>,
    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError>;
}

/// Companion trait that carries the **type-side source of truth** for
/// the Adapter ↔ `AgentKind` correspondence.
///
/// The base [`SpawnerFactory`] trait deliberately does not carry an
/// associated const so it stays dyn-compatible — that is, so it can be
/// stored and dispatched as `Arc<dyn SpawnerFactory>`. This companion
/// trait splits `const KIND: AgentKind` out, and
/// [`SpawnerRegistry::register`] uses `F::KIND` as the `HashMap` key.
/// That physically removes the string-lookup failure mode at the type
/// layer.
///
/// The three built-in factories (`Shell` / `InProc` / `Operator`)
/// implement this. Extension backends (say, `AgentBlockSpawnerFactory`)
/// follow the same explicit two-step recipe: add a new `AgentKind`
/// variant and implement this trait.
pub trait SpawnerFactoryKind: SpawnerFactory {
    /// The `AgentKind` this factory handles — used as the `HashMap` key
    /// by `SpawnerRegistry::register`.
    const KIND: AgentKind;
    /// The concrete Worker type produced by this `AgentKind` — this
    /// binds the type chain all the way from `AgentKind` down to `Worker`.
    /// Every factory declares it so the `AgentKind → Worker` mapping is
    /// explicit across all four layers. It is the source of truth for
    /// preserving the concrete type right up until `SpawnerAdapter::spawn`
    /// erases it into `Box<dyn Worker>`.
    type Worker: crate::worker::Worker;
}

/// `AgentKind → SpawnerFactory` mapping. The compiler looks entries up
/// during `compile()`.
#[derive(Clone)]
pub struct SpawnerRegistry {
    factories: HashMap<AgentKind, Arc<dyn SpawnerFactory>>,
}

impl SpawnerRegistry {
    /// Start with an empty `AgentKind → SpawnerFactory` mapping.
    pub fn new() -> Self {
        Self {
            factories: HashMap::new(),
        }
    }
    /// **Type-driven registration** — takes `F::KIND` and uses it as the
    /// `HashMap` key.
    ///
    /// Callers use the form
    /// `reg.register::<SubprocessProcessSpawnerFactory>(Arc::new(...))`
    /// and never have to pass an `AgentKind` literal. The Adapter ↔ Kind
    /// correspondence is enforced at the type layer, physically removing
    /// the string / enum-literal lookup failure mode.
    pub fn register<F: SpawnerFactoryKind + 'static>(&mut self, factory: Arc<F>) -> &mut Self {
        let f: Arc<dyn SpawnerFactory> = factory;
        self.factories.insert(F::KIND, f);
        self
    }
}

impl Default for SpawnerRegistry {
    fn default() -> Self {
        Self::new()
    }
}

// ─── Compiler ────────────────────────────────────────────────────────────

/// Turns a `Blueprint` into a `CompiledBlueprint` by resolving every
/// `AgentDef` against a `SpawnerRegistry`. One-shot: build a fresh
/// `Compiler` per `compile()` call (or reuse it — it holds no
/// per-compile state).
pub struct Compiler {
    registry: SpawnerRegistry,
    default_spawner: Option<Arc<dyn SpawnerAdapter>>,
}

/// The result of `Compiler::compile` — a routing table plus the
/// unmodified flow and metadata, ready to hand to
/// `EngineDispatcher::with_spawner` / `mlua_flow_ir::eval_async`.
pub struct CompiledBlueprint {
    /// `ctx.agent → SpawnerAdapter` lookup table.
    pub router: Arc<CompiledAgentTable>,
    /// The flow.ir source, copied verbatim from `Blueprint.flow`.
    pub flow: FlowNode,
    /// Copied verbatim from `Blueprint.metadata`.
    pub metadata: BlueprintMetadata,
}

impl Compiler {
    /// Build a `Compiler` around the given `SpawnerRegistry`, with no
    /// default spawner (unresolved flow refs are an error unless
    /// `with_default` is chained on).
    pub fn new(registry: SpawnerRegistry) -> Self {
        Self {
            registry,
            default_spawner: None,
        }
    }

    /// Set a default spawner — used for flow refs (and unregistered
    /// `AgentKind`s under non-strict strategy) that don't resolve
    /// against any `AgentDef`/`SpawnerRegistry` entry.
    pub fn with_default(mut self, sp: Arc<dyn SpawnerAdapter>) -> Self {
        self.default_spawner = Some(sp);
        self
    }

    /// Resolve every `Blueprint.agents` entry through the registry,
    /// validate `operator_ref`s and flow refs per `Blueprint.strategy`,
    /// and return the routing table alongside the untouched flow and
    /// metadata.
    pub fn compile(&self, bp: &Blueprint) -> Result<CompiledBlueprint, CompileError> {
        let mut routes: HashMap<String, Arc<dyn SpawnerAdapter>> = HashMap::new();
        let mut seen: HashMap<String, ()> = HashMap::new();

        // Design-time validation (OperatorDef as a first-class value):
        // every `kind = Operator` agent's `spec.operator_ref` must point at
        // one of `bp.operators[].name`. A Blueprint with any Operator agent
        // must therefore declare its operators up front; the empty-operators
        // backward-compat bypass is retired.
        let defined: Vec<String> = bp.operators.iter().map(|o| o.name.clone()).collect();
        for ad in &bp.agents {
            if !matches!(ad.kind, AgentKind::Operator) {
                continue;
            }
            let op_ref = ad.spec.get("operator_ref").and_then(|v| v.as_str());
            if let Some(op_ref) = op_ref {
                if !defined.iter().any(|n| n == op_ref) {
                    return Err(CompileError::UnresolvedOperatorRef {
                        agent: ad.name.clone(),
                        op_ref: op_ref.to_string(),
                        defined: defined.clone(),
                    });
                }
            }
            // A missing `op_ref` is reported through OperatorSpawnerFactory.build under a different error.
        }

        for ad in &bp.agents {
            if seen.contains_key(&ad.name) {
                return Err(CompileError::DuplicateAgent(ad.name.clone()));
            }
            seen.insert(ad.name.clone(), ());

            let factory = match self.registry.factories.get(&ad.kind) {
                Some(f) => f.clone(),
                None => {
                    if bp.strategy.strict_kind {
                        return Err(CompileError::UnknownKind(ad.kind.clone()));
                    } else {
                        tracing::warn!(
                            agent = %ad.name,
                            kind = ?ad.kind,
                            "no spawner factory registered for agent kind; \
                             dropping agent from routing table (strict_kind=false)"
                        );
                        continue;
                    }
                }
            };
            let hint = bp.hints.per_agent.get(&ad.name);
            let spawner = factory.build(ad, hint)?;
            routes.insert(ad.name.clone(), spawner);
        }

        if bp.strategy.strict_refs {
            verify_refs(&bp.flow, &routes, self.default_spawner.is_some())?;
        }

        let router = Arc::new(CompiledAgentTable {
            routes,
            default: self.default_spawner.clone(),
        });
        Ok(CompiledBlueprint {
            router,
            flow: bp.flow.clone(),
            metadata: bp.metadata.clone(),
        })
    }
}

/// Walk the flow `Node`, collect every `Step.ref`, and check that no ref
/// is unresolved against `routes` (or the default, when one exists).
fn verify_refs(
    node: &FlowNode,
    routes: &HashMap<String, Arc<dyn SpawnerAdapter>>,
    has_default: bool,
) -> Result<(), CompileError> {
    let mut refs: Vec<String> = Vec::new();
    collect_refs(node, &mut refs);
    for r in refs {
        if !routes.contains_key(&r) && !has_default {
            return Err(CompileError::UnresolvedRef(r));
        }
    }
    Ok(())
}

fn collect_refs(node: &FlowNode, out: &mut Vec<String>) {
    match node {
        FlowNode::Step { ref_, .. } => out.push(ref_.clone()),
        FlowNode::Seq { children } => {
            for c in children {
                collect_refs(c, out);
            }
        }
        FlowNode::Branch { then_, else_, .. } => {
            collect_refs(then_, out);
            collect_refs(else_, out);
        }
        FlowNode::Fanout { body, .. } => collect_refs(body, out),
        FlowNode::Loop { body, .. } => collect_refs(body, out),
        FlowNode::Try { body, catch, .. } => {
            collect_refs(body, out);
            collect_refs(catch, out);
        }
        FlowNode::Assign { .. } => {} // The Assign node carries no ref.
    }
}

// ─── CompiledAgentTable ───────────────────────────────────────────────────────

/// The compile result: an `agent name → SpawnerAdapter` lookup table.
///
/// Looks `routes` up by `ctx.agent` (the flow.ir `Step.ref`) and hands
/// the spawn to the matching `SpawnerAdapter`. If the name is not
/// registered and a `default` is configured, the default is used; if
/// there is no default, `SpawnError::NotRegistered` is returned.
///
/// Layer wrapping (`AuditMiddleware` / `MainAIMiddleware` and friends) is
/// not this type's concern — that is done separately in
/// `service::linker::link`.
pub struct CompiledAgentTable {
    pub(crate) routes: HashMap<String, Arc<dyn SpawnerAdapter>>,
    pub(crate) default: Option<Arc<dyn SpawnerAdapter>>,
}

impl CompiledAgentTable {
    /// Whether the given agent name is registered in the table — i.e.,
    /// whether its spawner has been resolved.
    pub fn has_route(&self, agent: &str) -> bool {
        self.routes.contains_key(agent)
    }
    /// List every resolved agent name.
    pub fn routed_agents(&self) -> Vec<String> {
        self.routes.keys().cloned().collect()
    }
}

#[async_trait]
impl SpawnerAdapter for CompiledAgentTable {
    async fn spawn(
        &self,
        engine: &Engine,
        ctx: &Ctx,
        task_id: TaskId,
        attempt: u32,
        token: CapToken,
    ) -> Result<Box<dyn Worker>, SpawnError> {
        let sp = self
            .routes
            .get(&ctx.agent)
            .cloned()
            .or_else(|| self.default.clone())
            .ok_or_else(|| SpawnError::NotRegistered(ctx.agent.clone()))?;
        sp.spawn(engine, ctx, task_id, attempt, token).await
    }
}

// ─── default factories (three variants) ───────────────────────────────────

/// Factory for `AgentKind::Subprocess`. Turns the spec into a
/// [`ProcessSpawner`].
///
/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory`. Factory
/// names carry both the worker implementation and the host adapter so
/// they are not confused with each other; the old
/// `ShellSpawnerFactory` was renamed to this.
///
/// Spec shape:
/// ```jsonc
/// { "program": "agent-block", "args": ["-s","s.lua"],
///   "use_stdin": true,                       // optional, default = true
///   "stream_mode": "ndjson_lines" | "sse_events" | "length_prefixed" | null  // optional, default = null (plain)
/// }
/// ```
pub struct SubprocessProcessSpawnerFactory;

impl SpawnerFactoryKind for SubprocessProcessSpawnerFactory {
    const KIND: AgentKind = AgentKind::Subprocess;
    type Worker = crate::worker::process_spawner::ProcessWorker;
}

impl SpawnerFactory for SubprocessProcessSpawnerFactory {
    fn build(
        &self,
        agent_def: &AgentDef,
        _hint: Option<&Value>,
    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
        let agent_name = &agent_def.name;
        let spec = &agent_def.spec;
        let invalid = |msg: String| CompileError::InvalidSpec {
            name: agent_name.to_string(),
            msg,
        };
        let program = spec
            .get("program")
            .and_then(|v| v.as_str())
            .ok_or_else(|| invalid("shell spec: 'program' (string) required".into()))?
            .to_string();
        let args: Vec<String> = spec
            .get("args")
            .and_then(|v| v.as_array())
            .map(|a| {
                a.iter()
                    .filter_map(|x| x.as_str().map(|s| s.to_string()))
                    .collect()
            })
            .unwrap_or_default();
        let use_stdin = spec
            .get("use_stdin")
            .and_then(|v| v.as_bool())
            .unwrap_or(true);
        let stream_mode = match spec.get("stream_mode").and_then(|v| v.as_str()) {
            Some("ndjson_lines") => Some(StreamMode::NdjsonLines),
            Some("sse_events") => Some(StreamMode::SseEvents),
            Some("length_prefixed") => Some(StreamMode::LengthPrefixed),
            Some(other) => return Err(invalid(format!("unknown stream_mode: {other}"))),
            None => None,
        };

        let mut sp = ProcessSpawner {
            program,
            args,
            use_stdin,
            stream_mode,
        };
        if let Some(mode) = sp.stream_mode.clone() {
            sp = sp.stream_mode(mode);
        }
        Ok(Arc::new(sp))
    }
}

/// Factory for `AgentKind::Lua`. At `build` time it looks the `fn_id`
/// up in its internal registry and returns an [`InProcSpawner`] with the
/// Lua-eval `WorkerFn` registered under `agent_name` — one `InProcSpawner`
/// instance per agent.
///
/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory` (Lua
/// worker on InProcess adapter). One half of the old
/// `InProcSpawnerFactory`, split into Lua and RustFn variants.
///
/// Spec shape:
/// ```jsonc
/// { "fn_id": "patch-spawner" }     // Lua source id pre-registered with the factory
/// ```
pub struct LuaInProcessSpawnerFactory {
    registry: HashMap<String, WorkerFn>,
    bridges: HashMap<String, HostBridge>,
}

/// Rust-side bridge function callable from Lua.
///
/// Inputs and outputs are both `serde_json::Value` (i.e. JSON). Lua
/// invokes it as `host.<name>(arg_table)`. If the implementation needs
/// to call async Rust, the caller does the sync-ification (typically
/// `tokio::runtime::Handle::current().block_on(...)`).
///
/// Design intent: keep Lua scripts focused on flow control and `ctx`
/// walking, while the heavy lifting (LLM calls, RFC 6902 apply,
/// verifiers, and so on) stays on the Rust side. Going "pure Lua" —
/// removing the bridge — is a carry.
#[derive(Clone)]
pub struct HostBridge(
    Arc<dyn Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync>,
);

impl HostBridge {
    /// Wrap a Rust closure as a bridge callable from Lua.
    pub fn new<F>(f: F) -> Self
    where
        F: Fn(serde_json::Value) -> Result<serde_json::Value, String> + Send + Sync + 'static,
    {
        Self(Arc::new(f))
    }

    /// Invoke the bridge directly — a thin trampoline over the inner
    /// `Fn`. The production path goes through the Lua runtime, but this
    /// stays `pub` so unit tests can exercise the primitive directly.
    pub fn call(&self, arg: serde_json::Value) -> Result<serde_json::Value, String> {
        (self.0)(arg)
    }
}

/// Carrier type for Lua script sources. Paths are not required — a
/// source string plus an identifying label is all it holds.
///
/// Callers bring in the source (via `include_str!` or similar) and
/// register it with the factory through
/// [`LuaInProcessSpawnerFactory::register_lua`].
#[derive(Clone)]
pub struct LuaScriptSource {
    /// The Lua chunk source.
    pub source: String,
    /// Label used in error messages — typically the script's logical id
    /// (for example `"patch_spawner.lua"`).
    pub label: String,
}

impl LuaScriptSource {
    /// Wrap a Lua chunk source and its error-message label.
    pub fn new(source: impl Into<String>, label: impl Into<String>) -> Self {
        Self {
            source: source.into(),
            label: label.into(),
        }
    }
}

impl LuaInProcessSpawnerFactory {
    /// Start with no registered scripts and no host bridges.
    pub fn new() -> Self {
        Self {
            registry: HashMap::new(),
            bridges: HashMap::new(),
        }
    }

    /// Register a host bridge. Subsequent `register_lua` calls snapshot
    /// the current bridge set.
    ///
    /// Ordering rule: register bridges first, then call `register_lua`;
    /// bridges added after `register_lua` will not be visible to that
    /// script.
    pub fn with_bridge(mut self, name: impl Into<String>, bridge: HostBridge) -> Self {
        self.bridges.insert(name.into(), bridge);
        self
    }

    /// Register a **Lua-eval Worker** under `fn_id`.
    ///
    /// Each dispatch spins up a fresh `mlua::Lua` VM, injects globals
    /// (`_PROMPT` / `_AGENT` / `_TASK_ID` / `_ATTEMPT` / `_CTX` — the last
    /// is `_PROMPT` parsed as JSON, or `nil` if that fails), evaluates
    /// the script, and marshals the returned table into a `WorkerResult`.
    ///
    /// Marshalling rules for the return value:
    /// - `{ value = ..., ok = bool }` → `WorkerResult.value` /
    ///   `WorkerResult.ok` verbatim.
    /// - Anything else → `value = <returned value>`, `ok = true`.
    ///
    /// Execution runs on `tokio::task::spawn_blocking` because `mlua::Lua`
    /// is `!Send` and needs to stay away from the tokio async context.
    /// Host bridges (the Lua-to-Rust callback path) previously registered
    /// with [`Self::with_bridge`] are snapshotted at call time and
    /// injected into every dispatch inside `run_lua_worker`.
    pub fn register_lua(mut self, fn_id: impl Into<String>, source: LuaScriptSource) -> Self {
        let source = Arc::new(source);
        let bridges = Arc::new(self.bridges.clone());
        let wrapped: WorkerFn = Arc::new(move |inv| {
            let source = source.clone();
            let bridges = bridges.clone();
            Box::pin(run_lua_worker(source, bridges, inv))
        });
        self.registry.insert(fn_id.into(), wrapped);
        self
    }
}

/// Body of a single Lua-eval invocation (called from `register_lua`).
async fn run_lua_worker(
    source: Arc<LuaScriptSource>,
    bridges: Arc<HashMap<String, HostBridge>>,
    inv: crate::worker::adapter::WorkerInvocation,
) -> Result<crate::worker::adapter::WorkerResult, crate::worker::adapter::WorkerError> {
    use crate::worker::adapter::WorkerError;
    use mlua::LuaSerdeExt;

    let label = source.label.clone();
    let outcome =
        tokio::task::spawn_blocking(move || -> Result<(serde_json::Value, bool), String> {
            let lua = mlua::Lua::new();
            let g = lua.globals();

            // 1. Base globals.
            g.set("_PROMPT", inv.prompt.clone())
                .map_err(|e| format!("set _PROMPT: {e}"))?;
            g.set("_AGENT", inv.agent.clone())
                .map_err(|e| format!("set _AGENT: {e}"))?;
            g.set("_TASK_ID", inv.task_id.to_string())
                .map_err(|e| format!("set _TASK_ID: {e}"))?;
            g.set("_ATTEMPT", inv.attempt as i64)
                .map_err(|e| format!("set _ATTEMPT: {e}"))?;

            // 2. _CTX = JSON parse(_PROMPT); nil on parse failure (co-exists with the plain-string prompt path).
            if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(&inv.prompt) {
                let lua_val = lua
                    .to_value(&json_val)
                    .map_err(|e| format!("_CTX to_value: {e}"))?;
                g.set("_CTX", lua_val)
                    .map_err(|e| format!("set _CTX: {e}"))?;
            }

            // 3. Inject the host bridge (Lua can call `host.<name>(arg)`).
            if !bridges.is_empty() {
                let host = lua
                    .create_table()
                    .map_err(|e| format!("create host table: {e}"))?;
                for (name, bridge) in bridges.iter() {
                    let bridge = bridge.clone();
                    let bname = name.clone();
                    let f = lua
                        .create_function(move |lua, arg: mlua::Value| {
                            let json_arg: serde_json::Value = lua.from_value(arg).map_err(|e| {
                                mlua::Error::external(format!("bridge {bname} arg → json: {e}"))
                            })?;
                            let result_json =
                                bridge.call(json_arg).map_err(mlua::Error::external)?;
                            lua.to_value(&result_json).map_err(|e| {
                                mlua::Error::external(format!("bridge {bname} ret → lua: {e}"))
                            })
                        })
                        .map_err(|e| format!("create_function {name}: {e}"))?;
                    host.set(name.as_str(), f)
                        .map_err(|e| format!("host.{name} set: {e}"))?;
                }
                g.set("host", host).map_err(|e| format!("set host: {e}"))?;
            }

            // 4. eval
            let result: mlua::Value = lua
                .load(&source.source)
                .set_name(&source.label)
                .eval()
                .map_err(|e| format!("lua eval [{}]: {e}", source.label))?;

            // 5. Marshal: shape `{ value=..., ok=true }` or raw value.
            let json_result: serde_json::Value = lua
                .from_value(result)
                .map_err(|e| format!("lua → json [{}]: {e}", source.label))?;

            let (value, ok) = match &json_result {
                serde_json::Value::Object(map)
                    if map.contains_key("value") || map.contains_key("ok") =>
                {
                    let ok = map.get("ok").and_then(|v| v.as_bool()).unwrap_or(true);
                    let value = map.get("value").cloned().unwrap_or(json_result.clone());
                    (value, ok)
                }
                _ => (json_result, true),
            };
            Ok((value, ok))
        })
        .await
        .map_err(|e| WorkerError::Failed(format!("spawn_blocking join [{label}]: {e}")))?
        .map_err(WorkerError::Failed)?;

    Ok(crate::worker::adapter::WorkerResult {
        value: outcome.0,
        ok: outcome.1,
    })
}

impl Default for LuaInProcessSpawnerFactory {
    fn default() -> Self {
        Self::new()
    }
}

impl SpawnerFactoryKind for LuaInProcessSpawnerFactory {
    const KIND: AgentKind = AgentKind::Lua;
    type Worker = LuaWorker;
}

impl SpawnerFactory for LuaInProcessSpawnerFactory {
    fn build(
        &self,
        agent_def: &AgentDef,
        _hint: Option<&Value>,
    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
        build_inproc_from_registry::<LuaWorker>(&self.registry, agent_def, "lua")
    }
}

/// Factory for `AgentKind::RustFn`. At `build` time it looks the `fn_id`
/// up in its internal registry and returns an [`InProcSpawner`] with the
/// Rust closure `WorkerFn` registered under `agent_name`.
///
/// Naming convention: `<WorkerIMPL><AdapterType>SpawnerFactory` (RustFn
/// worker on InProcess adapter). Sibling to
/// [`LuaInProcessSpawnerFactory`] — the Lua-worker half of the same
/// split.
///
/// Spec shape:
/// ```jsonc
/// { "fn_id": "echo" }     // Rust closure id pre-registered with the factory
/// ```
pub struct RustFnInProcessSpawnerFactory {
    registry: HashMap<String, WorkerFn>,
}

impl RustFnInProcessSpawnerFactory {
    /// Start with no registered closures.
    pub fn new() -> Self {
        Self {
            registry: HashMap::new(),
        }
    }

    /// Register a Rust closure `WorkerFn` under `fn_id`, wrapping it so
    /// it matches the `WorkerFn` signature (boxed, pinned future).
    pub fn register_fn<F, Fut>(mut self, fn_id: impl Into<String>, f: F) -> Self
    where
        F: Fn(crate::worker::adapter::WorkerInvocation) -> Fut + Send + Sync + 'static,
        Fut: std::future::Future<
                Output = Result<
                    crate::worker::adapter::WorkerResult,
                    crate::worker::adapter::WorkerError,
                >,
            > + Send
            + 'static,
    {
        let f = Arc::new(f);
        let wrapped: WorkerFn = Arc::new(move |inv| {
            let f = f.clone();
            Box::pin(f(inv))
        });
        self.registry.insert(fn_id.into(), wrapped);
        self
    }
}

impl Default for RustFnInProcessSpawnerFactory {
    fn default() -> Self {
        Self::new()
    }
}

impl SpawnerFactoryKind for RustFnInProcessSpawnerFactory {
    const KIND: AgentKind = AgentKind::RustFn;
    type Worker = RustFnWorker;
}

impl SpawnerFactory for RustFnInProcessSpawnerFactory {
    fn build(
        &self,
        agent_def: &AgentDef,
        _hint: Option<&Value>,
    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
        build_inproc_from_registry::<RustFnWorker>(&self.registry, agent_def, "rust_fn")
    }
}

/// Shared build helper used by both the Lua and the RustFn factories —
/// look `spec.fn_id` up in the registry and return an `InProcSpawner`.
/// The generic type parameter `W` fixes the per-kind Worker concrete
/// type at the type level (the build-site half of the trait's
/// associated-type binding across the four-layer cascade).
fn build_inproc_from_registry<W>(
    registry: &HashMap<String, WorkerFn>,
    agent_def: &AgentDef,
    kind_label: &str,
) -> Result<Arc<dyn SpawnerAdapter>, CompileError>
where
    W: crate::worker::Worker + From<crate::worker::WorkerJoinHandler> + Send + Sync + 'static,
{
    let agent_name = &agent_def.name;
    let spec = &agent_def.spec;
    let invalid = |msg: String| CompileError::InvalidSpec {
        name: agent_name.to_string(),
        msg,
    };
    let fn_id = spec
        .get("fn_id")
        .and_then(|v| v.as_str())
        .ok_or_else(|| invalid(format!("{kind_label} spec: 'fn_id' (string) required")))?;
    let f = registry
        .get(fn_id)
        .cloned()
        .ok_or_else(|| invalid(format!("fn_id '{fn_id}' not registered in factory")))?;
    let mut sp: InProcSpawner<W> = InProcSpawner::<W>::typed();
    // Register under `agent_name` (the flow's `Step.ref`). Both
    // `CompiledAgentTable` and the `InProcSpawner` look the function up
    // by name, so the same key is needed at both layers.
    sp.registry.insert(agent_name.to_string(), f);
    Ok(Arc::new(sp))
}

/// Concrete Worker type for the Lua kind — a handle to a Lua-eval task
/// inside an mlua VM. Embeds a `WorkerJoinHandler`. Reserved as the home
/// for future Lua-specific extensions (an mlua VM cancellation
/// mechanism, Lua-side error type retention, and so on).
pub struct LuaWorker {
    /// The join handle / cancellation token for the underlying task.
    pub handler: crate::worker::WorkerJoinHandler,
}

impl From<crate::worker::WorkerJoinHandler> for LuaWorker {
    fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
        Self { handler }
    }
}

#[async_trait::async_trait]
impl crate::worker::Worker for LuaWorker {
    fn id(&self) -> &crate::types::WorkerId {
        &self.handler.worker_id
    }
    fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
        self.handler.cancel.clone()
    }
    async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
        self.handler.await_completion().await
    }
}

/// Concrete Worker type for the RustFn kind — a handle to a task that
/// directly calls a Rust closure. Embeds a `WorkerJoinHandler`. Being a
/// pure function, there is minimal kind-specific extension surface here;
/// the primary purpose is to nail down the type binding.
pub struct RustFnWorker {
    /// The join handle / cancellation token for the underlying task.
    pub handler: crate::worker::WorkerJoinHandler,
}

impl From<crate::worker::WorkerJoinHandler> for RustFnWorker {
    fn from(handler: crate::worker::WorkerJoinHandler) -> Self {
        Self { handler }
    }
}

#[async_trait::async_trait]
impl crate::worker::Worker for RustFnWorker {
    fn id(&self) -> &crate::types::WorkerId {
        &self.handler.worker_id
    }
    fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
        self.handler.cancel.clone()
    }
    async fn join(self: Box<Self>) -> Result<(), crate::worker::adapter::WorkerError> {
        self.handler.await_completion().await
    }
}

/// Factory for `AgentKind::Operator`. Looks up the `Arc<dyn Operator>`
/// pre-registered under `spec.operator_ref` and wraps it in an
/// `OperatorSpawner`.
///
/// Spec shape:
/// ```jsonc
/// { "operator_ref": "main_ai" }     // Operator id pre-registered with the factory
/// ```
///
/// # Split of responsibilities with `OperatorDelegateMiddleware`
///
/// The two axes exist for different reasons:
///
/// - **This factory (`OperatorSpawnerFactory` → `OperatorSpawner`) — the
///   AgentSpec axis.** Bakes a separate Operator backend into each
///   `AgentDef`. A `kind = Operator` `AgentDef` names its backend through
///   `spec.operator_ref`; at `compile()` time the `Arc<dyn Operator>` is
///   baked into `routes[agent_name]`. Because the `agent.md` loader
///   (`agent_md_loader`) defaults `kind` to `Operator`, agents that flow
///   in through agent-profiles land here.
///
/// - **`OperatorDelegateMiddleware` — the Blueprint-global (session)
///   axis.** Delegates every agent to the same Operator backend. At
///   session-attach time you call `engine.register_operator(id, op)`
///   plus `attach_with_ids(.., operator_backend_id = Some(id))` to bind
///   it session-wide, and declare
///   `spawner_hints.layers = ["operator_delegate"]` to opt in. `ctx.agent`
///   is ignored; the operator handles every spawn in that session (a
///   MainAI-wide driver, a human-wide console, that sort of thing).
///
/// # Exclusivity (a double fire is structurally impossible)
///
/// When both are effective — the hint is declared, the session has an
/// operator backend, **and** the Blueprint has a `kind = Operator`
/// `AgentDef` — `OperatorDelegateMiddleware` sits at the outer end of
/// the stack and **completely bypasses** `inner.spawn`. The
/// `OperatorSpawner` is never reached, so under those conditions this
/// factory's routes entry is inert. This is not a double fire — the
/// session axis is overriding the agent axis. Consistent usage means
/// picking one axis per use case.
///
/// Interior mutability is provided by an `Arc<RwLock>`. Even after the
/// factory has been stored as `Arc<dyn SpawnerFactory>` in
/// `SpawnerRegistry`, a caller holding an `Arc` clone can still add
/// Operator backends dynamically via `register_operator(&self, id, op)`.
/// Typical uses: registering a `WSOperatorSession` under the session id
/// on WebSocket connect, binding agents that arrive via the `agent.md`
/// loader to arbitrary backends, and so on. `build()` performs a
/// `read()` lookup each time.
pub struct OperatorSpawnerFactory {
    operators: Arc<std::sync::RwLock<HashMap<String, Arc<dyn Operator>>>>,
}

impl OperatorSpawnerFactory {
    /// Start with no registered Operator backends.
    pub fn new() -> Self {
        Self {
            operators: Arc::new(std::sync::RwLock::new(HashMap::new())),
        }
    }

    /// Register an Operator backend dynamically through `&self`.
    /// Overwrites are allowed — later wins. Callers can still reach this
    /// after the factory has been stored as `Arc<dyn SpawnerFactory>` in
    /// `SpawnerRegistry`, as long as they hold an `Arc` clone; interior
    /// mutability is provided by the inner `RwLock`.
    pub fn register_operator(&self, id: impl Into<String>, op: Arc<dyn Operator>) -> &Self {
        self.operators
            .write()
            .expect("OperatorSpawnerFactory.operators RwLock poisoned")
            .insert(id.into(), op);
        self
    }

    /// Dynamically unregister an id (used to clean up when a WebSocket
    /// disconnects, for example). A missing id is a no-op.
    pub fn unregister_operator(&self, id: &str) -> &Self {
        self.operators
            .write()
            .expect("OperatorSpawnerFactory.operators RwLock poisoned")
            .remove(id);
        self
    }
}

impl Default for OperatorSpawnerFactory {
    fn default() -> Self {
        Self::new()
    }
}

impl SpawnerFactoryKind for OperatorSpawnerFactory {
    const KIND: AgentKind = AgentKind::Operator;
    type Worker = crate::operator::OperatorWorker;
}

impl SpawnerFactory for OperatorSpawnerFactory {
    fn build(
        &self,
        agent_def: &AgentDef,
        _hint: Option<&Value>,
    ) -> Result<Arc<dyn SpawnerAdapter>, CompileError> {
        let agent_name = &agent_def.name;
        let spec = &agent_def.spec;
        // Bake AgentDef.profile.system_prompt into the OperatorSpawner at compile time.
        // `Some` → adopted first at spawn time; `None` → falls back to fetch_prompt (initial_directive).
        // Fallback path. Sibling: AgentBlockInProcessSpawnerFactory
        // (agent_block/runtime.rs) does the same compile-time bake by stuffing
        // the profile into BlockConfig.context.
        let system_prompt = agent_def.profile.as_ref().map(|p| p.system_prompt.clone());
        let invalid = |msg: String| CompileError::InvalidSpec {
            name: agent_name.to_string(),
            msg,
        };
        let op_ref = spec
            .get("operator_ref")
            .and_then(|v| v.as_str())
            .ok_or_else(|| invalid("operator spec: 'operator_ref' (string) required".into()))?;
        let operators = self
            .operators
            .read()
            .expect("OperatorSpawnerFactory.operators RwLock poisoned");
        let op = operators.get(op_ref).cloned().ok_or_else(|| {
            let mut names: Vec<String> = operators.keys().cloned().collect();
            names.sort();
            let names_list = if names.is_empty() {
                "<none>".to_string()
            } else {
                names.join(", ")
            };
            invalid(format!(
                "operator_ref '{op_ref}' not registered in factory. \
                 Registered sids: [{names_list}]. \
                 Hint: call mse_operator_join(roles=[...]) to mint the sid first."
            ))
        })?;
        drop(operators);
        Ok(Arc::new(OperatorSpawner::new(op, system_prompt)))
    }
}