agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentDefaultConfigs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::value::AgentValue;
16
17#[derive(Clone)]
18pub struct ASKit {
19    // agent id -> agent
20    pub(crate) agents:
21        Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23    // agent id -> sender
24    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26    // board name -> [board out agent id]
27    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29    // board name -> value
30    pub(crate) board_value: Arc<Mutex<HashMap<String, AgentValue>>>,
31
32    // source agent id -> [target agent id / source handle / target handle]
33    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35    // agent def name -> agent definition
36    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38    // agent flows (flow id -> flow)
39    pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41    // agent def name -> config
42    pub(crate) global_configs_map: Arc<Mutex<HashMap<String, AgentConfigs>>>,
43
44    // message sender
45    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47    // observers
48    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52    pub fn new() -> Self {
53        Self {
54            agents: Default::default(),
55            agent_txs: Default::default(),
56            board_out_agents: Default::default(),
57            board_value: Default::default(),
58            edges: Default::default(),
59            defs: Default::default(),
60            flows: Default::default(),
61            global_configs_map: Default::default(),
62            tx: Arc::new(Mutex::new(None)),
63            observers: Default::default(),
64        }
65    }
66
67    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68        self.tx
69            .lock()
70            .unwrap()
71            .clone()
72            .ok_or(AgentError::TxNotInitialized)
73    }
74
75    pub fn init() -> Result<Self, AgentError> {
76        let askit = Self::new();
77        askit.register_agents();
78        Ok(askit)
79    }
80
81    fn register_agents(&self) {
82        registry::register_inventory_agents(self);
83    }
84
85    pub async fn ready(&self) -> Result<(), AgentError> {
86        self.spawn_message_loop()?;
87        self.start_agent_flows().await?;
88        Ok(())
89    }
90
91    pub fn quit(&self) {
92        let mut tx_lock = self.tx.lock().unwrap();
93        *tx_lock = None;
94    }
95
96    pub fn register_agent(&self, def: AgentDefinition) {
97        let def_name = def.name.clone();
98        let def_global_configs = def.global_configs.clone();
99
100        let mut defs = self.defs.lock().unwrap();
101        defs.insert(def.name.clone(), def);
102
103        // if there is a global config, set it
104        if let Some(def_global_configs) = def_global_configs {
105            let mut new_configs = AgentConfigs::default();
106            for (key, config_entry) in def_global_configs.iter() {
107                new_configs.set(key.clone(), config_entry.value.clone());
108            }
109            self.set_global_configs(def_name, new_configs);
110        }
111    }
112
113    pub fn get_agent_definitions(&self) -> AgentDefinitions {
114        let defs = self.defs.lock().unwrap();
115        defs.clone()
116    }
117
118    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119        let defs = self.defs.lock().unwrap();
120        defs.get(def_name).cloned()
121    }
122
123    pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentDefaultConfigs> {
124        let defs = self.defs.lock().unwrap();
125        let Some(def) = defs.get(def_name) else {
126            return None;
127        };
128        def.default_configs.clone()
129    }
130
131    // flows
132
133    pub fn get_agent_flows(&self) -> AgentFlows {
134        let flows = self.flows.lock().unwrap();
135        flows.clone()
136    }
137
138    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139        if !Self::is_valid_flow_name(name) {
140            return Err(AgentError::InvalidFlowName(name.into()));
141        }
142
143        let new_name = self.unique_flow_name(name);
144        let mut flows = self.flows.lock().unwrap();
145        let flow = AgentFlow::new(new_name.clone());
146        flows.insert(flow.id().to_string(), flow.clone());
147        Ok(flow)
148    }
149
150    pub fn rename_agent_flow(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
151        if !Self::is_valid_flow_name(new_name) {
152            return Err(AgentError::InvalidFlowName(new_name.into()));
153        }
154
155        // check if the new name is already used
156        let new_name = self.unique_flow_name(new_name);
157
158        let mut flows = self.flows.lock().unwrap();
159
160        // remove the original flow
161        let Some(mut flow) = flows.remove(id) else {
162            return Err(AgentError::RenameFlowFailed(id.into()));
163        };
164
165        // insert renamed flow
166        flow.set_name(new_name.clone());
167        flows.insert(flow.id().to_string(), flow);
168        Ok(new_name)
169    }
170
171    fn is_valid_flow_name(new_name: &str) -> bool {
172        // Check if the name is empty
173        if new_name.trim().is_empty() {
174            return false;
175        }
176
177        // Checks for path-like names:
178        if new_name.contains('/') {
179            // Disallow leading, trailing, or consecutive slashes
180            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181                return false;
182            }
183            // Disallow segments that are "." or ".."
184            if new_name
185                .split('/')
186                .any(|segment| segment == "." || segment == "..")
187            {
188                return false;
189            }
190        }
191
192        // Check if the name contains invalid characters
193        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194        for c in invalid_chars {
195            if new_name.contains(c) {
196                return false;
197            }
198        }
199
200        true
201    }
202
203    pub fn unique_flow_name(&self, name: &str) -> String {
204        let mut new_name = name.trim().to_string();
205        let mut i = 2;
206        let flows = self.flows.lock().unwrap();
207        while flows.values().any(|flow| flow.name() == new_name) {
208            new_name = format!("{}{}", name, i);
209            i += 1;
210        }
211        new_name
212    }
213
214    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215        let id = agent_flow.id();
216
217        // add the given flow into flows
218        {
219            let mut flows = self.flows.lock().unwrap();
220            if flows.contains_key(id) {
221                return Err(AgentError::DuplicateId(id.into()));
222            }
223            flows.insert(id.to_string(), agent_flow.clone());
224        }
225
226        // add nodes into agents
227        for node in agent_flow.nodes().iter() {
228            self.add_agent(id, node).unwrap_or_else(|e| {
229                log::error!("Failed to add_agent_node {}: {}", node.id, e);
230            });
231        }
232
233        // add edges into edges
234        for edge in agent_flow.edges().iter() {
235            self.add_edge(edge).unwrap_or_else(|e| {
236                log::error!("Failed to add_edge {}: {}", edge.source, e);
237            });
238        }
239
240        Ok(())
241    }
242
243    pub async fn remove_agent_flow(&self, id: &str) -> Result<(), AgentError> {
244        let flow = {
245            let mut flows = self.flows.lock().unwrap();
246            let Some(flow) = flows.remove(id) else {
247                return Err(AgentError::FlowNotFound(id.to_string()));
248            };
249            flow.clone()
250        };
251
252        flow.stop(self).await?;
253
254        // Remove all nodes and edges associated with the flow
255        for node in flow.nodes() {
256            self.remove_agent(&node.id).await?;
257        }
258        for edge in flow.edges() {
259            self.remove_edge(edge);
260        }
261
262        Ok(())
263    }
264
265    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266        let flow_id = flow.id();
267
268        let mut flows = self.flows.lock().unwrap();
269        flows.insert(flow_id.to_string(), flow);
270        Ok(())
271    }
272
273    pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
274        let def = self
275            .get_agent_definition(def_name)
276            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
277        AgentFlowNode::new(&def)
278    }
279
280    pub fn add_agent_flow_node(
281        &self,
282        flow_id: &str,
283        node: &AgentFlowNode,
284    ) -> Result<(), AgentError> {
285        let mut flows = self.flows.lock().unwrap();
286        let Some(flow) = flows.get_mut(flow_id) else {
287            return Err(AgentError::FlowNotFound(flow_id.to_string()));
288        };
289        flow.add_node(node.clone());
290        self.add_agent(flow_id, node)?;
291        Ok(())
292    }
293
294    pub(crate) fn add_agent(&self, flow_id: &str, node: &AgentFlowNode) -> Result<(), AgentError> {
295        let mut agents = self.agents.lock().unwrap();
296        if agents.contains_key(&node.id) {
297            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
298        }
299        if let Ok(mut agent) = agent_new(
300            self.clone(),
301            node.id.clone(),
302            &node.def_name,
303            node.configs.clone(),
304        ) {
305            agent.set_flow_id(flow_id.to_string());
306            agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
307        } else {
308            return Err(AgentError::AgentCreationFailed(node.id.to_string()));
309        }
310        Ok(())
311    }
312
313    pub fn add_agent_flow_edge(
314        &self,
315        flow_id: &str,
316        edge: &AgentFlowEdge,
317    ) -> Result<(), AgentError> {
318        let mut flows = self.flows.lock().unwrap();
319        let Some(flow) = flows.get_mut(flow_id) else {
320            return Err(AgentError::FlowNotFound(flow_id.to_string()));
321        };
322        flow.add_edge(edge.clone());
323        self.add_edge(edge)?;
324        Ok(())
325    }
326
327    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
328        // check if the source agent exists
329        {
330            let agents = self.agents.lock().unwrap();
331            if !agents.contains_key(&edge.source) {
332                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
333            }
334        }
335
336        // check if handles are valid
337        if edge.source_handle.is_empty() {
338            return Err(AgentError::EmptySourceHandle);
339        }
340        if edge.target_handle.is_empty() {
341            return Err(AgentError::EmptyTargetHandle);
342        }
343
344        let mut edges = self.edges.lock().unwrap();
345        if let Some(targets) = edges.get_mut(&edge.source) {
346            if targets
347                .iter()
348                .any(|(target, source_handle, target_handle)| {
349                    *target == edge.target
350                        && *source_handle == edge.source_handle
351                        && *target_handle == edge.target_handle
352                })
353            {
354                return Err(AgentError::EdgeAlreadyExists);
355            }
356            targets.push((
357                edge.target.clone(),
358                edge.source_handle.clone(),
359                edge.target_handle.clone(),
360            ));
361        } else {
362            edges.insert(
363                edge.source.clone(),
364                vec![(
365                    edge.target.clone(),
366                    edge.source_handle.clone(),
367                    edge.target_handle.clone(),
368                )],
369            );
370        }
371        Ok(())
372    }
373
374    pub async fn remove_agent_flow_node(
375        &self,
376        flow_id: &str,
377        node_id: &str,
378    ) -> Result<(), AgentError> {
379        {
380            let mut flows = self.flows.lock().unwrap();
381            let Some(flow) = flows.get_mut(flow_id) else {
382                return Err(AgentError::FlowNotFound(flow_id.to_string()));
383            };
384            flow.remove_node(node_id);
385        }
386        self.remove_agent(node_id).await?;
387        Ok(())
388    }
389
390    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
391        self.stop_agent(agent_id).await?;
392
393        // remove from edges
394        {
395            let mut edges = self.edges.lock().unwrap();
396            let mut sources_to_remove = Vec::new();
397            for (source, targets) in edges.iter_mut() {
398                targets.retain(|(target, _, _)| target != agent_id);
399                if targets.is_empty() {
400                    sources_to_remove.push(source.clone());
401                }
402            }
403            for source in sources_to_remove {
404                edges.remove(&source);
405            }
406            edges.remove(agent_id);
407        }
408
409        // remove from agents
410        {
411            let mut agents = self.agents.lock().unwrap();
412            agents.remove(agent_id);
413        }
414
415        Ok(())
416    }
417
418    pub fn remove_agent_flow_edge(&self, flow_id: &str, edge_id: &str) -> Result<(), AgentError> {
419        let mut flows = self.flows.lock().unwrap();
420        let Some(flow) = flows.get_mut(flow_id) else {
421            return Err(AgentError::FlowNotFound(flow_id.to_string()));
422        };
423        let Some(edge) = flow.remove_edge(edge_id) else {
424            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
425        };
426        self.remove_edge(&edge);
427        Ok(())
428    }
429
430    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
431        let mut edges = self.edges.lock().unwrap();
432        if let Some(targets) = edges.get_mut(&edge.source) {
433            targets.retain(|(target, source_handle, target_handle)| {
434                *target != edge.target
435                    || *source_handle != edge.source_handle
436                    || *target_handle != edge.target_handle
437            });
438            if targets.is_empty() {
439                edges.remove(&edge.source);
440            }
441        }
442    }
443
444    pub fn copy_sub_flow(
445        &self,
446        nodes: &Vec<AgentFlowNode>,
447        edges: &Vec<AgentFlowEdge>,
448    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
449        flow::copy_sub_flow(nodes, edges)
450    }
451
452    pub async fn start_agent_flow(&self, id: &str) -> Result<(), AgentError> {
453        let flow = {
454            let flows = self.flows.lock().unwrap();
455            let Some(flow) = flows.get(id) else {
456                return Err(AgentError::FlowNotFound(id.to_string()));
457            };
458            flow.clone()
459        };
460        flow.start(self).await?;
461        Ok(())
462    }
463
464    pub async fn stop_agent_flow(&self, id: &str) -> Result<(), AgentError> {
465        let flow = {
466            let flows = self.flows.lock().unwrap();
467            let Some(flow) = flows.get(id) else {
468                return Err(AgentError::FlowNotFound(id.to_string()));
469            };
470            flow.clone()
471        };
472        flow.stop(self).await?;
473        Ok(())
474    }
475
476    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
477        let agent = {
478            let agents = self.agents.lock().unwrap();
479            let Some(a) = agents.get(agent_id) else {
480                return Err(AgentError::AgentNotFound(agent_id.to_string()));
481            };
482            a.clone()
483        };
484        let def_name = {
485            let agent = agent.lock().await;
486            agent.def_name().to_string()
487        };
488        let uses_native_thread = {
489            let defs = self.defs.lock().unwrap();
490            let Some(def) = defs.get(&def_name) else {
491                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
492            };
493            def.native_thread
494        };
495        let agent_status = {
496            let agent = agent.lock().await;
497            agent.status().clone()
498        };
499        if agent_status == AgentStatus::Init {
500            log::info!("Starting agent {}", agent_id);
501
502            if uses_native_thread {
503                let (tx, rx) = std::sync::mpsc::channel();
504
505                {
506                    let mut agent_txs = self.agent_txs.lock().unwrap();
507                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
508                };
509
510                let agent_id = agent_id.to_string();
511                std::thread::spawn(async move || {
512                    if let Err(e) = agent.lock().await.start().await {
513                        log::error!("Failed to start agent {}: {}", agent_id, e);
514                    }
515
516                    while let Ok(message) = rx.recv() {
517                        match message {
518                            AgentMessage::Input { ctx, pin, value } => {
519                                agent
520                                    .lock()
521                                    .await
522                                    .process(ctx, pin, value)
523                                    .await
524                                    .unwrap_or_else(|e| {
525                                        log::error!("Process Error {}: {}", agent_id, e);
526                                    });
527                            }
528                            AgentMessage::Config { configs } => {
529                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
530                                    log::error!("Config Error {}: {}", agent_id, e);
531                                });
532                            }
533                            AgentMessage::Stop => {
534                                break;
535                            }
536                        }
537                    }
538                });
539            } else {
540                let (tx, mut rx) = mpsc::channel(32);
541
542                {
543                    let mut agent_txs = self.agent_txs.lock().unwrap();
544                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
545                };
546
547                let agent_id = agent_id.to_string();
548                tokio::spawn(async move {
549                    {
550                        let mut agent_guard = agent.lock().await;
551                        if let Err(e) = agent_guard.start().await {
552                            log::error!("Failed to start agent {}: {}", agent_id, e);
553                        }
554                    }
555
556                    while let Some(message) = rx.recv().await {
557                        match message {
558                            AgentMessage::Input { ctx, pin, value } => {
559                                agent
560                                    .lock()
561                                    .await
562                                    .process(ctx, pin, value)
563                                    .await
564                                    .unwrap_or_else(|e| {
565                                        log::error!("Process Error {}: {}", agent_id, e);
566                                    });
567                            }
568                            AgentMessage::Config { configs } => {
569                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
570                                    log::error!("Config Error {}: {}", agent_id, e);
571                                });
572                            }
573                            AgentMessage::Stop => {
574                                rx.close();
575                                return;
576                            }
577                        }
578                    }
579                });
580            }
581        }
582        Ok(())
583    }
584
585    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
586        let agent = {
587            let agents = self.agents.lock().unwrap();
588            let Some(a) = agents.get(agent_id) else {
589                return Err(AgentError::AgentNotFound(agent_id.to_string()));
590            };
591            a.clone()
592        };
593
594        let agent_status = {
595            let agent = agent.lock().await;
596            agent.status().clone()
597        };
598        if agent_status == AgentStatus::Start {
599            log::info!("Stopping agent {}", agent_id);
600
601            {
602                let mut agent_txs = self.agent_txs.lock().unwrap();
603                if let Some(tx) = agent_txs.remove(agent_id) {
604                    match tx {
605                        AgentMessageSender::Sync(tx) => {
606                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
607                                log::error!(
608                                    "Failed to send stop message to agent {}: {}",
609                                    agent_id,
610                                    e
611                                );
612                            });
613                        }
614                        AgentMessageSender::Async(tx) => {
615                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
616                                log::error!(
617                                    "Failed to send stop message to agent {}: {}",
618                                    agent_id,
619                                    e
620                                );
621                            });
622                        }
623                    }
624                }
625            }
626
627            agent.lock().await.stop().await?;
628        }
629
630        Ok(())
631    }
632
633    pub async fn set_agent_configs(
634        &self,
635        agent_id: String,
636        configs: AgentConfigs,
637    ) -> Result<(), AgentError> {
638        let agent = {
639            let agents = self.agents.lock().unwrap();
640            let Some(a) = agents.get(&agent_id) else {
641                return Err(AgentError::AgentNotFound(agent_id.to_string()));
642            };
643            a.clone()
644        };
645
646        let agent_status = {
647            let agent = agent.lock().await;
648            agent.status().clone()
649        };
650        if agent_status == AgentStatus::Init {
651            agent.lock().await.set_configs(configs.clone())?;
652        } else if agent_status == AgentStatus::Start {
653            let tx = {
654                let agent_txs = self.agent_txs.lock().unwrap();
655                let Some(tx) = agent_txs.get(&agent_id) else {
656                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
657                };
658                tx.clone()
659            };
660            let message = AgentMessage::Config { configs };
661            match tx {
662                AgentMessageSender::Sync(tx) => {
663                    tx.send(message).map_err(|_| {
664                        AgentError::SendMessageFailed("Failed to send config message".to_string())
665                    })?;
666                }
667                AgentMessageSender::Async(tx) => {
668                    tx.send(message).await.map_err(|_| {
669                        AgentError::SendMessageFailed("Failed to send config message".to_string())
670                    })?;
671                }
672            }
673        }
674        Ok(())
675    }
676
677    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
678        let global_configs_map = self.global_configs_map.lock().unwrap();
679        global_configs_map.get(def_name).cloned()
680    }
681
682    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
683        let mut global_configs_map = self.global_configs_map.lock().unwrap();
684
685        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
686            global_configs_map.insert(def_name, configs);
687            return;
688        };
689
690        for (key, value) in configs {
691            existing_configs.set(key, value);
692        }
693    }
694
695    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
696        for (agent_name, new_configs) in new_configs_map {
697            self.set_global_configs(agent_name, new_configs);
698        }
699    }
700
701    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
702        let global_configs_map = self.global_configs_map.lock().unwrap();
703        global_configs_map.clone()
704    }
705
706    pub(crate) async fn agent_input(
707        &self,
708        agent_id: String,
709        ctx: AgentContext,
710        pin: String,
711        value: AgentValue,
712    ) -> Result<(), AgentError> {
713        let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
714            let agents = self.agents.lock().unwrap();
715            let Some(a) = agents.get(&agent_id) else {
716                return Err(AgentError::AgentNotFound(agent_id.to_string()));
717            };
718            a.clone()
719        };
720
721        let agent_status = {
722            let agent = agent.lock().await;
723            agent.status().clone()
724        };
725        if agent_status != AgentStatus::Start {
726            return Ok(());
727        }
728
729        if pin.starts_with("config:") {
730            let config_key = pin[7..].to_string();
731            let mut agent = agent.lock().await;
732            agent.set_config(config_key.clone(), value.clone())?;
733            return Ok(());
734        }
735
736        let message = AgentMessage::Input {
737            ctx,
738            pin: pin.clone(),
739            value,
740        };
741
742        let tx = {
743            let agent_txs = self.agent_txs.lock().unwrap();
744            let Some(tx) = agent_txs.get(&agent_id) else {
745                return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
746            };
747            tx.clone()
748        };
749        match tx {
750            AgentMessageSender::Sync(tx) => {
751                tx.send(message).map_err(|_| {
752                    AgentError::SendMessageFailed("Failed to send input message".to_string())
753                })?;
754            }
755            AgentMessageSender::Async(tx) => {
756                tx.send(message).await.map_err(|_| {
757                    AgentError::SendMessageFailed("Failed to send input message".to_string())
758                })?;
759            }
760        }
761        self.emit_agent_input(agent_id.to_string(), pin);
762
763        Ok(())
764    }
765
766    pub async fn send_agent_out(
767        &self,
768        agent_id: String,
769        ctx: AgentContext,
770        pin: String,
771        value: AgentValue,
772    ) -> Result<(), AgentError> {
773        message::send_agent_out(self, agent_id, ctx, pin, value).await
774    }
775
776    pub fn try_send_agent_out(
777        &self,
778        agent_id: String,
779        ctx: AgentContext,
780        pin: String,
781        value: AgentValue,
782    ) -> Result<(), AgentError> {
783        message::try_send_agent_out(self, agent_id, ctx, pin, value)
784    }
785
786    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
787        self.try_send_board_out(name, AgentContext::new(), value)
788    }
789
790    pub(crate) fn try_send_board_out(
791        &self,
792        name: String,
793        ctx: AgentContext,
794        value: AgentValue,
795    ) -> Result<(), AgentError> {
796        message::try_send_board_out(self, name, ctx, value)
797    }
798
799    fn spawn_message_loop(&self) -> Result<(), AgentError> {
800        // TODO: settings for the channel size
801        let (tx, mut rx) = mpsc::channel(4096);
802        {
803            let mut tx_lock = self.tx.lock().unwrap();
804            *tx_lock = Some(tx);
805        }
806
807        // spawn the main loop
808        let askit = self.clone();
809        tokio::spawn(async move {
810            while let Some(message) = rx.recv().await {
811                use AgentEventMessage::*;
812
813                match message {
814                    AgentOut {
815                        agent,
816                        ctx,
817                        pin,
818                        value,
819                    } => {
820                        message::agent_out(&askit, agent, ctx, pin, value).await;
821                    }
822                    BoardOut { name, ctx, value } => {
823                        message::board_out(&askit, name, ctx, value).await;
824                    }
825                }
826            }
827        });
828
829        Ok(())
830    }
831
832    async fn start_agent_flows(&self) -> Result<(), AgentError> {
833        let agent_flow_ids;
834        {
835            let agent_flows = self.flows.lock().unwrap();
836            agent_flow_ids = agent_flows.keys().cloned().collect::<Vec<_>>();
837        }
838        for id in agent_flow_ids {
839            self.start_agent_flow(&id).await.unwrap_or_else(|e| {
840                log::error!("Failed to start agent flow: {}", e);
841            });
842        }
843        Ok(())
844    }
845
846    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
847        let mut observers = self.observers.lock().unwrap();
848        let observer_id = new_observer_id();
849        observers.insert(observer_id, observer);
850        observer_id
851    }
852
853    pub fn unsubscribe(&self, observer_id: usize) {
854        let mut observers = self.observers.lock().unwrap();
855        observers.remove(&observer_id);
856    }
857
858    pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
859        self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
860    }
861
862    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
863        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
864    }
865
866    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
867        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
868    }
869
870    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
871        // ignore variables
872        if name.starts_with('%') {
873            return;
874        }
875        self.notify_observers(ASKitEvent::Board(name, value));
876    }
877
878    fn notify_observers(&self, event: ASKitEvent) {
879        let observers = self.observers.lock().unwrap();
880        for (_id, observer) in observers.iter() {
881            observer.notify(&event);
882        }
883    }
884}
885
886#[derive(Clone, Debug)]
887pub enum ASKitEvent {
888    AgentDisplay(String, String, AgentValue), // (agent_id, key, value)
889    AgentError(String, String),               // (agent_id, message)
890    AgentIn(String, String),                  // (agent_id, pin)
891    Board(String, AgentValue),                // (board name, value)
892}
893
894pub trait ASKitObserver {
895    fn notify(&self, event: &ASKitEvent);
896}
897
898static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
899
900fn new_observer_id() -> usize {
901    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
902}
903
904// Agent Message
905
906#[derive(Clone)]
907pub enum AgentMessageSender {
908    Sync(std::sync::mpsc::Sender<AgentMessage>),
909    Async(mpsc::Sender<AgentMessage>),
910}