agent_stream_kit/
askit.rs

1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::{Mutex as AsyncMutex, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentSpec, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::value::AgentValue;
16
17const MESSAGE_LIMIT: usize = 1024;
18
19#[derive(Clone)]
20pub struct ASKit {
21    // agent id -> agent
22    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
23
24    // agent id -> sender
25    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
26
27    // board name -> [board out agent id]
28    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
29
30    // board name -> value
31    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
32
33    // source agent id -> [target agent id / source handle / target handle]
34    pub(crate) edges: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
35
36    // agent def name -> agent definition
37    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39    // agent flows (flow id -> flow)
40    pub(crate) flows: Arc<Mutex<AgentFlows>>,
41
42    // agent def name -> config
43    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
44
45    // message sender
46    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48    // observers
49    pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53    pub fn new() -> Self {
54        Self {
55            agents: Default::default(),
56            agent_txs: Default::default(),
57            board_out_agents: Default::default(),
58            board_value: Default::default(),
59            edges: Default::default(),
60            defs: Default::default(),
61            flows: Default::default(),
62            global_configs_map: Default::default(),
63            tx: Arc::new(Mutex::new(None)),
64            observers: Default::default(),
65        }
66    }
67
68    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69        self.tx
70            .lock()
71            .unwrap()
72            .clone()
73            .ok_or(AgentError::TxNotInitialized)
74    }
75
76    pub fn init() -> Result<Self, AgentError> {
77        let askit = Self::new();
78        askit.register_agents();
79        Ok(askit)
80    }
81
82    fn register_agents(&self) {
83        registry::register_inventory_agents(self);
84    }
85
86    pub async fn ready(&self) -> Result<(), AgentError> {
87        self.spawn_message_loop().await?;
88        self.start_agent_flows().await?;
89        Ok(())
90    }
91
92    pub fn quit(&self) {
93        let mut tx_lock = self.tx.lock().unwrap();
94        *tx_lock = None;
95    }
96
97    pub fn register_agent(&self, def: AgentDefinition) {
98        let def_name = def.name.clone();
99        let def_global_configs = def.global_configs.clone();
100
101        let mut defs = self.defs.lock().unwrap();
102        defs.insert(def.name.clone(), def);
103
104        // if there is a global config, set it
105        if let Some(def_global_configs) = def_global_configs {
106            let mut new_configs = AgentConfigs::default();
107            for (key, config_entry) in def_global_configs.iter() {
108                new_configs.set(key.clone(), config_entry.value.clone());
109            }
110            self.set_global_configs(def_name, new_configs);
111        }
112    }
113
114    pub fn get_agent_definitions(&self) -> AgentDefinitions {
115        let defs = self.defs.lock().unwrap();
116        defs.clone()
117    }
118
119    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
120        let defs = self.defs.lock().unwrap();
121        defs.get(def_name).cloned()
122    }
123
124    pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
125        let defs = self.defs.lock().unwrap();
126        let Some(def) = defs.get(def_name) else {
127            return None;
128        };
129        def.default_configs.clone()
130    }
131
132    pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
133        let agents = self.agents.lock().unwrap();
134        let Some(agent) = agents.get(agent_id) else {
135            return None;
136        };
137        let agent = agent.blocking_lock();
138        Some(agent.spec().clone())
139    }
140
141    // flows
142
143    pub fn get_agent_flows(&self) -> AgentFlows {
144        let flows = self.flows.lock().unwrap();
145        flows.clone()
146    }
147
148    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
149        if !Self::is_valid_flow_name(name) {
150            return Err(AgentError::InvalidFlowName(name.into()));
151        }
152
153        let new_name = self.unique_flow_name(name);
154        let mut flows = self.flows.lock().unwrap();
155        let flow = AgentFlow::new(new_name.clone());
156        flows.insert(flow.id().to_string(), flow.clone());
157        Ok(flow)
158    }
159
160    pub fn rename_agent_flow(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
161        if !Self::is_valid_flow_name(new_name) {
162            return Err(AgentError::InvalidFlowName(new_name.into()));
163        }
164
165        // check if the new name is already used
166        let new_name = self.unique_flow_name(new_name);
167
168        let mut flows = self.flows.lock().unwrap();
169
170        // remove the original flow
171        let Some(mut flow) = flows.swap_remove(id) else {
172            return Err(AgentError::RenameFlowFailed(id.into()));
173        };
174
175        // insert renamed flow
176        flow.set_name(new_name.clone());
177        flows.insert(flow.id().to_string(), flow);
178        Ok(new_name)
179    }
180
181    fn is_valid_flow_name(new_name: &str) -> bool {
182        // Check if the name is empty
183        if new_name.trim().is_empty() {
184            return false;
185        }
186
187        // Checks for path-like names:
188        if new_name.contains('/') {
189            // Disallow leading, trailing, or consecutive slashes
190            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
191                return false;
192            }
193            // Disallow segments that are "." or ".."
194            if new_name
195                .split('/')
196                .any(|segment| segment == "." || segment == "..")
197            {
198                return false;
199            }
200        }
201
202        // Check if the name contains invalid characters
203        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
204        for c in invalid_chars {
205            if new_name.contains(c) {
206                return false;
207            }
208        }
209
210        true
211    }
212
213    pub fn unique_flow_name(&self, name: &str) -> String {
214        let mut new_name = name.trim().to_string();
215        let mut i = 2;
216        let flows = self.flows.lock().unwrap();
217        while flows.values().any(|flow| flow.name() == new_name) {
218            new_name = format!("{}{}", name, i);
219            i += 1;
220        }
221        new_name
222    }
223
224    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
225        let id = agent_flow.id();
226
227        // add the given flow into flows
228        {
229            let mut flows = self.flows.lock().unwrap();
230            if flows.contains_key(id) {
231                return Err(AgentError::DuplicateId(id.into()));
232            }
233            flows.insert(id.to_string(), agent_flow.clone());
234        }
235
236        // add nodes into agents
237        for node in agent_flow.nodes().iter() {
238            if let Err(e) = self.add_agent(id, node) {
239                log::error!("Failed to add_agent_node {}: {}", node.id, e);
240            }
241        }
242
243        // add edges into edges
244        for edge in agent_flow.edges().iter() {
245            self.add_edge(edge).unwrap_or_else(|e| {
246                log::error!("Failed to add_edge {}: {}", edge.source, e);
247            });
248        }
249
250        Ok(())
251    }
252
253    pub async fn remove_agent_flow(&self, id: &str) -> Result<(), AgentError> {
254        let flow = {
255            let mut flows = self.flows.lock().unwrap();
256            let Some(flow) = flows.swap_remove(id) else {
257                return Err(AgentError::FlowNotFound(id.to_string()));
258            };
259            flow.clone()
260        };
261
262        flow.stop(self).await?;
263
264        // Remove all nodes and edges associated with the flow
265        for node in flow.nodes() {
266            self.remove_agent(&node.id).await?;
267        }
268        for edge in flow.edges() {
269            self.remove_edge(edge);
270        }
271
272        Ok(())
273    }
274
275    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
276        let flow_id = flow.id();
277
278        let mut flows = self.flows.lock().unwrap();
279        flows.insert(flow_id.to_string(), flow);
280        Ok(())
281    }
282
283    pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
284        let def = self
285            .get_agent_definition(def_name)
286            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
287        AgentFlowNode::new(&def)
288    }
289
290    pub fn add_agent_flow_node(
291        &self,
292        flow_id: &str,
293        node: &AgentFlowNode,
294    ) -> Result<(), AgentError> {
295        let mut flows = self.flows.lock().unwrap();
296        let Some(flow) = flows.get_mut(flow_id) else {
297            return Err(AgentError::FlowNotFound(flow_id.to_string()));
298        };
299        flow.add_node(node.clone());
300        self.add_agent(flow_id, node)
301    }
302
303    pub(crate) fn add_agent(&self, flow_id: &str, node: &AgentFlowNode) -> Result<(), AgentError> {
304        let mut agents = self.agents.lock().unwrap();
305        if agents.contains_key(&node.id) {
306            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
307        }
308        let mut agent = agent_new(self.clone(), node.id.clone(), node.spec.clone())?;
309        agent.set_flow_id(flow_id.to_string());
310        agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
311        Ok(())
312    }
313
314    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
315        let agents = self.agents.lock().unwrap();
316        agents.get(agent_id).cloned()
317    }
318
319    pub fn add_agent_flow_edge(
320        &self,
321        flow_id: &str,
322        edge: &AgentFlowEdge,
323    ) -> Result<(), AgentError> {
324        let mut flows = self.flows.lock().unwrap();
325        let Some(flow) = flows.get_mut(flow_id) else {
326            return Err(AgentError::FlowNotFound(flow_id.to_string()));
327        };
328        flow.add_edge(edge.clone());
329        self.add_edge(edge)?;
330        Ok(())
331    }
332
333    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
334        // check if the source agent exists
335        {
336            let agents = self.agents.lock().unwrap();
337            if !agents.contains_key(&edge.source) {
338                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
339            }
340        }
341
342        // check if handles are valid
343        if edge.source_handle.is_empty() {
344            return Err(AgentError::EmptySourceHandle);
345        }
346        if edge.target_handle.is_empty() {
347            return Err(AgentError::EmptyTargetHandle);
348        }
349
350        let mut edges = self.edges.lock().unwrap();
351        if let Some(targets) = edges.get_mut(&edge.source) {
352            if targets
353                .iter()
354                .any(|(target, source_handle, target_handle)| {
355                    *target == edge.target
356                        && *source_handle == edge.source_handle
357                        && *target_handle == edge.target_handle
358                })
359            {
360                return Err(AgentError::EdgeAlreadyExists);
361            }
362            targets.push((
363                edge.target.clone(),
364                edge.source_handle.clone(),
365                edge.target_handle.clone(),
366            ));
367        } else {
368            edges.insert(
369                edge.source.clone(),
370                vec![(
371                    edge.target.clone(),
372                    edge.source_handle.clone(),
373                    edge.target_handle.clone(),
374                )],
375            );
376        }
377        Ok(())
378    }
379
380    pub async fn remove_agent_flow_node(
381        &self,
382        flow_id: &str,
383        node_id: &str,
384    ) -> Result<(), AgentError> {
385        {
386            let mut flows = self.flows.lock().unwrap();
387            let Some(flow) = flows.get_mut(flow_id) else {
388                return Err(AgentError::FlowNotFound(flow_id.to_string()));
389            };
390            flow.remove_node(node_id);
391        }
392        self.remove_agent(node_id).await?;
393        Ok(())
394    }
395
396    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
397        self.stop_agent(agent_id).await?;
398
399        // remove from edges
400        {
401            let mut edges = self.edges.lock().unwrap();
402            let mut sources_to_remove = Vec::new();
403            for (source, targets) in edges.iter_mut() {
404                targets.retain(|(target, _, _)| target != agent_id);
405                if targets.is_empty() {
406                    sources_to_remove.push(source.clone());
407                }
408            }
409            for source in sources_to_remove {
410                edges.swap_remove(&source);
411            }
412            edges.swap_remove(agent_id);
413        }
414
415        // remove from agents
416        {
417            let mut agents = self.agents.lock().unwrap();
418            agents.swap_remove(agent_id);
419        }
420
421        Ok(())
422    }
423
424    pub fn remove_agent_flow_edge(&self, flow_id: &str, edge_id: &str) -> Result<(), AgentError> {
425        let mut flows = self.flows.lock().unwrap();
426        let Some(flow) = flows.get_mut(flow_id) else {
427            return Err(AgentError::FlowNotFound(flow_id.to_string()));
428        };
429        let Some(edge) = flow.remove_edge(edge_id) else {
430            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
431        };
432        self.remove_edge(&edge);
433        Ok(())
434    }
435
436    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
437        let mut edges = self.edges.lock().unwrap();
438        if let Some(targets) = edges.get_mut(&edge.source) {
439            targets.retain(|(target, source_handle, target_handle)| {
440                *target != edge.target
441                    || *source_handle != edge.source_handle
442                    || *target_handle != edge.target_handle
443            });
444            if targets.is_empty() {
445                edges.swap_remove(&edge.source);
446            }
447        }
448    }
449
450    pub fn copy_sub_flow(
451        &self,
452        nodes: &Vec<AgentFlowNode>,
453        edges: &Vec<AgentFlowEdge>,
454    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
455        flow::copy_sub_flow(nodes, edges)
456    }
457
458    pub async fn start_agent_flow(&self, id: &str) -> Result<(), AgentError> {
459        let flow = {
460            let flows = self.flows.lock().unwrap();
461            let Some(flow) = flows.get(id) else {
462                return Err(AgentError::FlowNotFound(id.to_string()));
463            };
464            flow.clone()
465        };
466        flow.start(self).await?;
467        Ok(())
468    }
469
470    pub async fn stop_agent_flow(&self, id: &str) -> Result<(), AgentError> {
471        let flow = {
472            let flows = self.flows.lock().unwrap();
473            let Some(flow) = flows.get(id) else {
474                return Err(AgentError::FlowNotFound(id.to_string()));
475            };
476            flow.clone()
477        };
478        flow.stop(self).await?;
479        Ok(())
480    }
481
482    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
483        let agent = {
484            let agents = self.agents.lock().unwrap();
485            let Some(a) = agents.get(agent_id) else {
486                return Err(AgentError::AgentNotFound(agent_id.to_string()));
487            };
488            a.clone()
489        };
490        let def_name = {
491            let agent = agent.lock().await;
492            agent.def_name().to_string()
493        };
494        let uses_native_thread = {
495            let defs = self.defs.lock().unwrap();
496            let Some(def) = defs.get(&def_name) else {
497                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
498            };
499            def.native_thread
500        };
501        let agent_status = {
502            let agent = agent.lock().await;
503            agent.status().clone()
504        };
505        if agent_status == AgentStatus::Init {
506            log::info!("Starting agent {}", agent_id);
507
508            if uses_native_thread {
509                let (tx, rx) = std::sync::mpsc::channel();
510
511                {
512                    let mut agent_txs = self.agent_txs.lock().unwrap();
513                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
514                };
515
516                let agent_id = agent_id.to_string();
517                std::thread::spawn(async move || {
518                    if let Err(e) = agent.lock().await.start().await {
519                        log::error!("Failed to start agent {}: {}", agent_id, e);
520                    }
521
522                    while let Ok(message) = rx.recv() {
523                        match message {
524                            AgentMessage::Input { ctx, pin, value } => {
525                                agent
526                                    .lock()
527                                    .await
528                                    .process(ctx, pin, value)
529                                    .await
530                                    .unwrap_or_else(|e| {
531                                        log::error!("Process Error {}: {}", agent_id, e);
532                                    });
533                            }
534                            AgentMessage::Config { configs } => {
535                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
536                                    log::error!("Config Error {}: {}", agent_id, e);
537                                });
538                            }
539                            AgentMessage::Stop => {
540                                break;
541                            }
542                        }
543                    }
544                });
545            } else {
546                let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
547
548                {
549                    let mut agent_txs = self.agent_txs.lock().unwrap();
550                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
551                };
552
553                let agent_id = agent_id.to_string();
554                tokio::spawn(async move {
555                    {
556                        let mut agent_guard = agent.lock().await;
557                        if let Err(e) = agent_guard.start().await {
558                            log::error!("Failed to start agent {}: {}", agent_id, e);
559                        }
560                    }
561
562                    while let Some(message) = rx.recv().await {
563                        match message {
564                            AgentMessage::Input { ctx, pin, value } => {
565                                agent
566                                    .lock()
567                                    .await
568                                    .process(ctx, pin, value)
569                                    .await
570                                    .unwrap_or_else(|e| {
571                                        log::error!("Process Error {}: {}", agent_id, e);
572                                    });
573                            }
574                            AgentMessage::Config { configs } => {
575                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
576                                    log::error!("Config Error {}: {}", agent_id, e);
577                                });
578                            }
579                            AgentMessage::Stop => {
580                                rx.close();
581                                return;
582                            }
583                        }
584                    }
585                });
586                tokio::task::yield_now().await;
587            }
588        }
589        Ok(())
590    }
591
592    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
593        let agent = {
594            let agents = self.agents.lock().unwrap();
595            let Some(a) = agents.get(agent_id) else {
596                return Err(AgentError::AgentNotFound(agent_id.to_string()));
597            };
598            a.clone()
599        };
600
601        let agent_status = {
602            let agent = agent.lock().await;
603            agent.status().clone()
604        };
605        if agent_status == AgentStatus::Start {
606            log::info!("Stopping agent {}", agent_id);
607
608            {
609                let mut agent_txs = self.agent_txs.lock().unwrap();
610                if let Some(tx) = agent_txs.swap_remove(agent_id) {
611                    match tx {
612                        AgentMessageSender::Sync(tx) => {
613                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
614                                log::error!(
615                                    "Failed to send stop message to agent {}: {}",
616                                    agent_id,
617                                    e
618                                );
619                            });
620                        }
621                        AgentMessageSender::Async(tx) => {
622                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
623                                log::error!(
624                                    "Failed to send stop message to agent {}: {}",
625                                    agent_id,
626                                    e
627                                );
628                            });
629                        }
630                    }
631                }
632            }
633
634            agent.lock().await.stop().await?;
635        }
636
637        Ok(())
638    }
639
640    pub async fn set_agent_configs(
641        &self,
642        agent_id: String,
643        configs: AgentConfigs,
644    ) -> Result<(), AgentError> {
645        let agent = {
646            let agents = self.agents.lock().unwrap();
647            let Some(a) = agents.get(&agent_id) else {
648                return Err(AgentError::AgentNotFound(agent_id.to_string()));
649            };
650            a.clone()
651        };
652
653        let agent_status = {
654            let agent = agent.lock().await;
655            agent.status().clone()
656        };
657        if agent_status == AgentStatus::Init {
658            agent.lock().await.set_configs(configs.clone())?;
659        } else if agent_status == AgentStatus::Start {
660            let tx = {
661                let agent_txs = self.agent_txs.lock().unwrap();
662                let Some(tx) = agent_txs.get(&agent_id) else {
663                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
664                };
665                tx.clone()
666            };
667            let message = AgentMessage::Config { configs };
668            match tx {
669                AgentMessageSender::Sync(tx) => {
670                    tx.send(message).map_err(|_| {
671                        AgentError::SendMessageFailed("Failed to send config message".to_string())
672                    })?;
673                }
674                AgentMessageSender::Async(tx) => {
675                    tx.send(message).await.map_err(|_| {
676                        AgentError::SendMessageFailed("Failed to send config message".to_string())
677                    })?;
678                }
679            }
680        }
681        Ok(())
682    }
683
684    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
685        let global_configs_map = self.global_configs_map.lock().unwrap();
686        global_configs_map.get(def_name).cloned()
687    }
688
689    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
690        let mut global_configs_map = self.global_configs_map.lock().unwrap();
691
692        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
693            global_configs_map.insert(def_name, configs);
694            return;
695        };
696
697        for (key, value) in configs {
698            existing_configs.set(key, value);
699        }
700    }
701
702    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
703        let global_configs_map = self.global_configs_map.lock().unwrap();
704        global_configs_map.clone()
705    }
706
707    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
708        for (agent_name, new_configs) in new_configs_map {
709            self.set_global_configs(agent_name, new_configs);
710        }
711    }
712
713    pub async fn agent_input(
714        &self,
715        agent_id: String,
716        ctx: AgentContext,
717        pin: String,
718        value: AgentValue,
719    ) -> Result<(), AgentError> {
720        let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
721            let agents = self.agents.lock().unwrap();
722            let Some(a) = agents.get(&agent_id) else {
723                return Err(AgentError::AgentNotFound(agent_id.to_string()));
724            };
725            a.clone()
726        };
727
728        let agent_status = {
729            let agent = agent.lock().await;
730            agent.status().clone()
731        };
732        if agent_status != AgentStatus::Start {
733            return Ok(());
734        }
735
736        if pin.starts_with("config:") {
737            let config_key = pin[7..].to_string();
738            let mut agent = agent.lock().await;
739            agent.set_config(config_key.clone(), value.clone())?;
740            return Ok(());
741        }
742
743        let message = AgentMessage::Input {
744            ctx,
745            pin: pin.clone(),
746            value,
747        };
748
749        let tx = {
750            let agent_txs = self.agent_txs.lock().unwrap();
751            let Some(tx) = agent_txs.get(&agent_id) else {
752                return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
753            };
754            tx.clone()
755        };
756        match tx {
757            AgentMessageSender::Sync(tx) => {
758                tx.send(message).map_err(|_| {
759                    AgentError::SendMessageFailed("Failed to send input message".to_string())
760                })?;
761            }
762            AgentMessageSender::Async(tx) => {
763                tx.send(message).await.map_err(|_| {
764                    AgentError::SendMessageFailed("Failed to send input message".to_string())
765                })?;
766            }
767        }
768
769        self.emit_agent_input(agent_id.to_string(), pin);
770
771        Ok(())
772    }
773
774    pub async fn send_agent_out(
775        &self,
776        agent_id: String,
777        ctx: AgentContext,
778        pin: String,
779        value: AgentValue,
780    ) -> Result<(), AgentError> {
781        message::send_agent_out(self, agent_id, ctx, pin, value).await
782    }
783
784    pub fn try_send_agent_out(
785        &self,
786        agent_id: String,
787        ctx: AgentContext,
788        pin: String,
789        value: AgentValue,
790    ) -> Result<(), AgentError> {
791        message::try_send_agent_out(self, agent_id, ctx, pin, value)
792    }
793
794    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
795        self.try_send_board_out(name, AgentContext::new(), value)
796    }
797
798    pub(crate) fn try_send_board_out(
799        &self,
800        name: String,
801        ctx: AgentContext,
802        value: AgentValue,
803    ) -> Result<(), AgentError> {
804        message::try_send_board_out(self, name, ctx, value)
805    }
806
807    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
808        // TODO: settings for the channel size
809        let (tx, mut rx) = mpsc::channel(4096);
810        {
811            let mut tx_lock = self.tx.lock().unwrap();
812            *tx_lock = Some(tx);
813        }
814
815        // spawn the main loop
816        let askit = self.clone();
817        tokio::spawn(async move {
818            while let Some(message) = rx.recv().await {
819                use AgentEventMessage::*;
820
821                match message {
822                    AgentOut {
823                        agent,
824                        ctx,
825                        pin,
826                        value,
827                    } => {
828                        message::agent_out(&askit, agent, ctx, pin, value).await;
829                    }
830                    BoardOut { name, ctx, value } => {
831                        message::board_out(&askit, name, ctx, value).await;
832                    }
833                }
834            }
835        });
836
837        tokio::task::yield_now().await;
838
839        Ok(())
840    }
841
842    async fn start_agent_flows(&self) -> Result<(), AgentError> {
843        let agent_flow_ids;
844        {
845            let agent_flows = self.flows.lock().unwrap();
846            agent_flow_ids = agent_flows.keys().cloned().collect::<Vec<_>>();
847        }
848        for id in agent_flow_ids {
849            self.start_agent_flow(&id).await.unwrap_or_else(|e| {
850                log::error!("Failed to start agent flow: {}", e);
851            });
852        }
853        Ok(())
854    }
855
856    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
857        let mut observers = self.observers.lock().unwrap();
858        let observer_id = new_observer_id();
859        observers.insert(observer_id, observer);
860        observer_id
861    }
862
863    pub fn unsubscribe(&self, observer_id: usize) {
864        let mut observers = self.observers.lock().unwrap();
865        observers.swap_remove(&observer_id);
866    }
867
868    pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
869        self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
870    }
871
872    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
873        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
874    }
875
876    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
877        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
878    }
879
880    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
881        self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
882    }
883
884    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
885        // ignore variables
886        if name.starts_with('%') {
887            return;
888        }
889        self.notify_observers(ASKitEvent::Board(name, value));
890    }
891
892    fn notify_observers(&self, event: ASKitEvent) {
893        let observers = self.observers.lock().unwrap();
894        for (_id, observer) in observers.iter() {
895            observer.notify(&event);
896        }
897    }
898}
899
900#[derive(Clone, Debug)]
901pub enum ASKitEvent {
902    AgentDisplay(String, String, AgentValue), // (agent_id, key, value)
903    AgentError(String, String),               // (agent_id, message)
904    AgentIn(String, String),                  // (agent_id, pin)
905    AgentSpecUpdated(String),                 // (agent_id)
906    Board(String, AgentValue),                // (board name, value)
907}
908
909pub trait ASKitObserver {
910    fn notify(&self, event: &ASKitEvent);
911}
912
913static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
914
915fn new_observer_id() -> usize {
916    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
917}
918
919// Agent Message
920
921#[derive(Clone)]
922pub enum AgentMessageSender {
923    Sync(std::sync::mpsc::Sender<AgentMessage>),
924    Async(mpsc::Sender<AgentMessage>),
925}