agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::registry;
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentDefaultConfigs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
14use crate::message::{self, AgentEventMessage};
15use crate::value::AgentValue;
16
17#[derive(Clone)]
18pub struct ASKit {
19    // agent id -> agent
20    pub(crate) agents:
21        Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23    // agent id -> sender
24    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26    // board name -> [board out agent id]
27    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29    // board name -> value
30    pub(crate) board_value: Arc<Mutex<HashMap<String, AgentValue>>>,
31
32    // sourece agent id -> [target agent id / source handle / target handle]
33    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35    // agent def name -> agent definition
36    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38    // agent flows
39    pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41    // agent def name -> config
42    pub(crate) global_configs_map: Arc<Mutex<HashMap<String, AgentConfigs>>>,
43
44    // message sender
45    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47    // observers
48    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52    pub fn new() -> Self {
53        Self {
54            agents: Default::default(),
55            agent_txs: Default::default(),
56            board_out_agents: Default::default(),
57            board_value: Default::default(),
58            edges: Default::default(),
59            defs: Default::default(),
60            flows: Default::default(),
61            global_configs_map: Default::default(),
62            tx: Arc::new(Mutex::new(None)),
63            observers: Default::default(),
64        }
65    }
66
67    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68        self.tx
69            .lock()
70            .unwrap()
71            .clone()
72            .ok_or(AgentError::TxNotInitialized)
73    }
74
75    pub fn init() -> Result<Self, AgentError> {
76        let askit = Self::new();
77        askit.register_agents();
78        Ok(askit)
79    }
80
81    fn register_agents(&self) {
82        registry::register_inventory_agents(self);
83    }
84
85    pub async fn ready(&self) -> Result<(), AgentError> {
86        self.spawn_message_loop()?;
87        self.start_agent_flows().await?;
88        Ok(())
89    }
90
91    pub fn quit(&self) {
92        let mut tx_lock = self.tx.lock().unwrap();
93        *tx_lock = None;
94    }
95
96    pub fn register_agent(&self, def: AgentDefinition) {
97        let def_name = def.name.clone();
98        let def_global_configs = def.global_configs.clone();
99
100        let mut defs = self.defs.lock().unwrap();
101        defs.insert(def.name.clone(), def);
102
103        // if there is a global config, set it
104        if let Some(def_global_configs) = def_global_configs {
105            let mut new_configs = AgentConfigs::default();
106            for (key, config_entry) in def_global_configs.iter() {
107                new_configs.set(key.clone(), config_entry.value.clone());
108            }
109            self.set_global_configs(def_name, new_configs);
110        }
111    }
112
113    pub fn get_agent_definitions(&self) -> AgentDefinitions {
114        let defs = self.defs.lock().unwrap();
115        defs.clone()
116    }
117
118    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119        let defs = self.defs.lock().unwrap();
120        defs.get(def_name).cloned()
121    }
122
123    pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentDefaultConfigs> {
124        let defs = self.defs.lock().unwrap();
125        let Some(def) = defs.get(def_name) else {
126            return None;
127        };
128        def.default_configs.clone()
129    }
130
131    // // flow
132
133    pub fn get_agent_flows(&self) -> AgentFlows {
134        let flows = self.flows.lock().unwrap();
135        flows.clone()
136    }
137
138    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139        if !Self::is_valid_flow_name(name) {
140            return Err(AgentError::InvalidFlowName(name.into()));
141        }
142
143        let new_name = self.unique_flow_name(name);
144        let mut flows = self.flows.lock().unwrap();
145        let flow = AgentFlow::new(new_name.clone());
146        flows.insert(new_name, flow.clone());
147        Ok(flow)
148    }
149
150    pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
151        if !Self::is_valid_flow_name(new_name) {
152            return Err(AgentError::InvalidFlowName(new_name.into()));
153        }
154
155        // check if the new name is already used
156        let new_name = self.unique_flow_name(new_name);
157
158        let mut flows = self.flows.lock().unwrap();
159
160        // remove the original flow
161        let Some(mut flow) = flows.remove(old_name) else {
162            return Err(AgentError::RenameFlowFailed(old_name.into()));
163        };
164
165        // insert renamed flow
166        flow.set_name(new_name.clone());
167        flows.insert(new_name.clone(), flow);
168        Ok(new_name)
169    }
170
171    fn is_valid_flow_name(new_name: &str) -> bool {
172        // Check if the name is empty
173        if new_name.trim().is_empty() {
174            return false;
175        }
176
177        // Checks for path-like names:
178        if new_name.contains('/') {
179            // Disallow leading, trailing, or consecutive slashes
180            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181                return false;
182            }
183            // Disallow segments that are "." or ".."
184            if new_name
185                .split('/')
186                .any(|segment| segment == "." || segment == "..")
187            {
188                return false;
189            }
190        }
191
192        // Check if the name contains invalid characters
193        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194        for c in invalid_chars {
195            if new_name.contains(c) {
196                return false;
197            }
198        }
199
200        true
201    }
202
203    pub fn unique_flow_name(&self, name: &str) -> String {
204        let mut new_name = name.trim().to_string();
205        let mut i = 2;
206        let flows = self.flows.lock().unwrap();
207        while flows.contains_key(&new_name) {
208            new_name = format!("{}{}", name, i);
209            i += 1;
210        }
211        new_name
212    }
213
214    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215        let name = agent_flow.name();
216
217        // add the given flow into flows
218        {
219            let mut flows = self.flows.lock().unwrap();
220            if flows.contains_key(name) {
221                return Err(AgentError::DuplicateFlowName(name.into()));
222            }
223            flows.insert(name.into(), agent_flow.clone());
224        }
225
226        // add nodes into agents
227        for node in agent_flow.nodes().iter() {
228            self.add_agent(name, node).unwrap_or_else(|e| {
229                log::error!("Failed to add_agent_node {}: {}", node.id, e);
230            });
231        }
232
233        // add edges into edges
234        for edge in agent_flow.edges().iter() {
235            self.add_edge(edge).unwrap_or_else(|e| {
236                log::error!("Failed to add_edge {}: {}", edge.source, e);
237            });
238        }
239
240        Ok(())
241    }
242
243    pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
244        let flow = {
245            let mut flows = self.flows.lock().unwrap();
246            let Some(flow) = flows.remove(flow_name) else {
247                return Err(AgentError::FlowNotFound(flow_name.to_string()));
248            };
249            flow.clone()
250        };
251
252        flow.stop(self).await?;
253
254        // Remove all nodes and edges associated with the flow
255        for node in flow.nodes() {
256            self.remove_agent(&node.id).await?;
257        }
258        for edge in flow.edges() {
259            self.remove_edge(edge);
260        }
261
262        Ok(())
263    }
264
265    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266        let flow_name = flow.name();
267
268        let mut flows = self.flows.lock().unwrap();
269        flows.insert(flow_name.to_string(), flow);
270        Ok(())
271    }
272
273    pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
274        let def = self
275            .get_agent_definition(def_name)
276            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
277        AgentFlowNode::new(&def)
278    }
279
280    pub fn add_agent_flow_node(
281        &self,
282        flow_name: &str,
283        node: &AgentFlowNode,
284    ) -> Result<(), AgentError> {
285        let mut flows = self.flows.lock().unwrap();
286        let Some(flow) = flows.get_mut(flow_name) else {
287            return Err(AgentError::FlowNotFound(flow_name.to_string()));
288        };
289        flow.add_node(node.clone());
290        self.add_agent(flow_name, node)?;
291        Ok(())
292    }
293
294    pub(crate) fn add_agent(
295        &self,
296        flow_name: &str,
297        node: &AgentFlowNode,
298    ) -> Result<(), AgentError> {
299        let mut agents = self.agents.lock().unwrap();
300        if agents.contains_key(&node.id) {
301            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
302        }
303        if let Ok(mut agent) = agent_new(
304            self.clone(),
305            node.id.clone(),
306            &node.def_name,
307            node.configs.clone(),
308        ) {
309            agent.set_flow_name(flow_name.to_string());
310            agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
311        } else {
312            return Err(AgentError::AgentCreationFailed(node.id.to_string()));
313        }
314        Ok(())
315    }
316
317    pub fn add_agent_flow_edge(
318        &self,
319        flow_name: &str,
320        edge: &AgentFlowEdge,
321    ) -> Result<(), AgentError> {
322        let mut flows = self.flows.lock().unwrap();
323        let Some(flow) = flows.get_mut(flow_name) else {
324            return Err(AgentError::FlowNotFound(flow_name.to_string()));
325        };
326        flow.add_edge(edge.clone());
327        self.add_edge(edge)?;
328        Ok(())
329    }
330
331    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
332        // check if the source agent exists
333        {
334            let agents = self.agents.lock().unwrap();
335            if !agents.contains_key(&edge.source) {
336                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
337            }
338        }
339
340        // check if handles are valid
341        if edge.source_handle.is_empty() {
342            return Err(AgentError::EmptySourceHandle);
343        }
344        if edge.target_handle.is_empty() {
345            return Err(AgentError::EmptyTargetHandle);
346        }
347
348        let mut edges = self.edges.lock().unwrap();
349        if let Some(targets) = edges.get_mut(&edge.source) {
350            if targets
351                .iter()
352                .any(|(target, source_handle, target_handle)| {
353                    *target == edge.target
354                        && *source_handle == edge.source_handle
355                        && *target_handle == edge.target_handle
356                })
357            {
358                return Err(AgentError::EdgeAlreadyExists);
359            }
360            targets.push((
361                edge.target.clone(),
362                edge.source_handle.clone(),
363                edge.target_handle.clone(),
364            ));
365        } else {
366            edges.insert(
367                edge.source.clone(),
368                vec![(
369                    edge.target.clone(),
370                    edge.source_handle.clone(),
371                    edge.target_handle.clone(),
372                )],
373            );
374        }
375        Ok(())
376    }
377
378    pub async fn remove_agent_flow_node(
379        &self,
380        flow_name: &str,
381        node_id: &str,
382    ) -> Result<(), AgentError> {
383        {
384            let mut flows = self.flows.lock().unwrap();
385            let Some(flow) = flows.get_mut(flow_name) else {
386                return Err(AgentError::FlowNotFound(flow_name.to_string()));
387            };
388            flow.remove_node(node_id);
389        }
390        self.remove_agent(node_id).await?;
391        Ok(())
392    }
393
394    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
395        self.stop_agent(agent_id).await?;
396
397        // remove from edges
398        {
399            let mut edges = self.edges.lock().unwrap();
400            let mut sources_to_remove = Vec::new();
401            for (source, targets) in edges.iter_mut() {
402                targets.retain(|(target, _, _)| target != agent_id);
403                if targets.is_empty() {
404                    sources_to_remove.push(source.clone());
405                }
406            }
407            for source in sources_to_remove {
408                edges.remove(&source);
409            }
410            edges.remove(agent_id);
411        }
412
413        // remove from agents
414        {
415            let mut agents = self.agents.lock().unwrap();
416            agents.remove(agent_id);
417        }
418
419        Ok(())
420    }
421
422    pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
423        let mut flows = self.flows.lock().unwrap();
424        let Some(flow) = flows.get_mut(flow_name) else {
425            return Err(AgentError::FlowNotFound(flow_name.to_string()));
426        };
427        let Some(edge) = flow.remove_edge(edge_id) else {
428            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
429        };
430        self.remove_edge(&edge);
431        Ok(())
432    }
433
434    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
435        let mut edges = self.edges.lock().unwrap();
436        if let Some(targets) = edges.get_mut(&edge.source) {
437            targets.retain(|(target, source_handle, target_handle)| {
438                *target != edge.target
439                    || *source_handle != edge.source_handle
440                    || *target_handle != edge.target_handle
441            });
442            if targets.is_empty() {
443                edges.remove(&edge.source);
444            }
445        }
446    }
447
448    pub fn copy_sub_flow(
449        &self,
450        nodes: &Vec<AgentFlowNode>,
451        edges: &Vec<AgentFlowEdge>,
452    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
453        flow::copy_sub_flow(nodes, edges)
454    }
455
456    pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
457        let flow = {
458            let flows = self.flows.lock().unwrap();
459            let Some(flow) = flows.get(name) else {
460                return Err(AgentError::FlowNotFound(name.to_string()));
461            };
462            flow.clone()
463        };
464        flow.start(self).await?;
465        Ok(())
466    }
467
468    pub async fn stop_agent_flow(&self, name: &str) -> Result<(), AgentError> {
469        let flow = {
470            let flows = self.flows.lock().unwrap();
471            let Some(flow) = flows.get(name) else {
472                return Err(AgentError::FlowNotFound(name.to_string()));
473            };
474            flow.clone()
475        };
476        flow.stop(self).await?;
477        Ok(())
478    }
479
480    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
481        let agent = {
482            let agents = self.agents.lock().unwrap();
483            let Some(a) = agents.get(agent_id) else {
484                return Err(AgentError::AgentNotFound(agent_id.to_string()));
485            };
486            a.clone()
487        };
488        let def_name = {
489            let agent = agent.lock().await;
490            agent.def_name().to_string()
491        };
492        let uses_native_thread = {
493            let defs = self.defs.lock().unwrap();
494            let Some(def) = defs.get(&def_name) else {
495                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
496            };
497            def.native_thread
498        };
499        let agent_status = {
500            let agent = agent.lock().await;
501            agent.status().clone()
502        };
503        if agent_status == AgentStatus::Init {
504            log::info!("Starting agent {}", agent_id);
505
506            if uses_native_thread {
507                let (tx, rx) = std::sync::mpsc::channel();
508
509                {
510                    let mut agent_txs = self.agent_txs.lock().unwrap();
511                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
512                };
513
514                let agent_id = agent_id.to_string();
515                std::thread::spawn(async move || {
516                    if let Err(e) = agent.lock().await.start().await {
517                        log::error!("Failed to start agent {}: {}", agent_id, e);
518                    }
519
520                    while let Ok(message) = rx.recv() {
521                        match message {
522                            AgentMessage::Input { ctx, pin, value } => {
523                                agent
524                                    .lock()
525                                    .await
526                                    .process(ctx, pin, value)
527                                    .await
528                                    .unwrap_or_else(|e| {
529                                        log::error!("Process Error {}: {}", agent_id, e);
530                                    });
531                            }
532                            AgentMessage::Config { configs } => {
533                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
534                                    log::error!("Config Error {}: {}", agent_id, e);
535                                });
536                            }
537                            AgentMessage::Stop => {
538                                break;
539                            }
540                        }
541                    }
542                });
543            } else {
544                let (tx, mut rx) = mpsc::channel(32);
545
546                {
547                    let mut agent_txs = self.agent_txs.lock().unwrap();
548                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
549                };
550
551                let agent_id = agent_id.to_string();
552                tokio::spawn(async move {
553                    {
554                        let mut agent_guard = agent.lock().await;
555                        if let Err(e) = agent_guard.start().await {
556                            log::error!("Failed to start agent {}: {}", agent_id, e);
557                        }
558                    }
559
560                    while let Some(message) = rx.recv().await {
561                        match message {
562                            AgentMessage::Input { ctx, pin, value } => {
563                                agent
564                                    .lock()
565                                    .await
566                                    .process(ctx, pin, value)
567                                    .await
568                                    .unwrap_or_else(|e| {
569                                        log::error!("Process Error {}: {}", agent_id, e);
570                                    });
571                            }
572                            AgentMessage::Config { configs } => {
573                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
574                                    log::error!("Config Error {}: {}", agent_id, e);
575                                });
576                            }
577                            AgentMessage::Stop => {
578                                rx.close();
579                                return;
580                            }
581                        }
582                    }
583                });
584            }
585        }
586        Ok(())
587    }
588
589    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
590        let agent = {
591            let agents = self.agents.lock().unwrap();
592            let Some(a) = agents.get(agent_id) else {
593                return Err(AgentError::AgentNotFound(agent_id.to_string()));
594            };
595            a.clone()
596        };
597
598        let agent_status = {
599            let agent = agent.lock().await;
600            agent.status().clone()
601        };
602        if agent_status == AgentStatus::Start {
603            log::info!("Stopping agent {}", agent_id);
604
605            {
606                let mut agent_txs = self.agent_txs.lock().unwrap();
607                if let Some(tx) = agent_txs.remove(agent_id) {
608                    match tx {
609                        AgentMessageSender::Sync(tx) => {
610                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
611                                log::error!(
612                                    "Failed to send stop message to agent {}: {}",
613                                    agent_id,
614                                    e
615                                );
616                            });
617                        }
618                        AgentMessageSender::Async(tx) => {
619                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
620                                log::error!(
621                                    "Failed to send stop message to agent {}: {}",
622                                    agent_id,
623                                    e
624                                );
625                            });
626                        }
627                    }
628                }
629            }
630
631            agent.lock().await.stop().await?;
632        }
633
634        Ok(())
635    }
636
637    pub async fn set_agent_configs(
638        &self,
639        agent_id: String,
640        configs: AgentConfigs,
641    ) -> Result<(), AgentError> {
642        let agent = {
643            let agents = self.agents.lock().unwrap();
644            let Some(a) = agents.get(&agent_id) else {
645                return Err(AgentError::AgentNotFound(agent_id.to_string()));
646            };
647            a.clone()
648        };
649
650        let agent_status = {
651            let agent = agent.lock().await;
652            agent.status().clone()
653        };
654        if agent_status == AgentStatus::Init {
655            agent.lock().await.set_configs(configs.clone())?;
656        } else if agent_status == AgentStatus::Start {
657            let tx = {
658                let agent_txs = self.agent_txs.lock().unwrap();
659                let Some(tx) = agent_txs.get(&agent_id) else {
660                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
661                };
662                tx.clone()
663            };
664            let message = AgentMessage::Config { configs };
665            match tx {
666                AgentMessageSender::Sync(tx) => {
667                    tx.send(message).map_err(|_| {
668                        AgentError::SendMessageFailed("Failed to send config message".to_string())
669                    })?;
670                }
671                AgentMessageSender::Async(tx) => {
672                    tx.send(message).await.map_err(|_| {
673                        AgentError::SendMessageFailed("Failed to send config message".to_string())
674                    })?;
675                }
676            }
677        }
678        Ok(())
679    }
680
681    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
682        let global_configs_map = self.global_configs_map.lock().unwrap();
683        global_configs_map.get(def_name).cloned()
684    }
685
686    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
687        let mut global_configs_map = self.global_configs_map.lock().unwrap();
688
689        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
690            global_configs_map.insert(def_name, configs);
691            return;
692        };
693
694        for (key, value) in configs {
695            existing_configs.set(key, value);
696        }
697    }
698
699    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
700        for (agent_name, new_configs) in new_configs_map {
701            self.set_global_configs(agent_name, new_configs);
702        }
703    }
704
705    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
706        let global_configs_map = self.global_configs_map.lock().unwrap();
707        global_configs_map.clone()
708    }
709
710    pub(crate) async fn agent_input(
711        &self,
712        agent_id: String,
713        ctx: AgentContext,
714        pin: String,
715        value: AgentValue,
716    ) -> Result<(), AgentError> {
717        let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
718            let agents = self.agents.lock().unwrap();
719            let Some(a) = agents.get(&agent_id) else {
720                return Err(AgentError::AgentNotFound(agent_id.to_string()));
721            };
722            a.clone()
723        };
724
725        let agent_status = {
726            let agent = agent.lock().await;
727            agent.status().clone()
728        };
729        if agent_status != AgentStatus::Start {
730            return Ok(());
731        }
732
733        if pin.starts_with("config:") {
734            let config_key = pin[7..].to_string();
735            let mut agent = agent.lock().await;
736            agent.set_config(config_key.clone(), value.clone())?;
737            return Ok(());
738        }
739
740        let message = AgentMessage::Input {
741            ctx,
742            pin: pin.clone(),
743            value,
744        };
745
746        let tx = {
747            let agent_txs = self.agent_txs.lock().unwrap();
748            let Some(tx) = agent_txs.get(&agent_id) else {
749                return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
750            };
751            tx.clone()
752        };
753        match tx {
754            AgentMessageSender::Sync(tx) => {
755                tx.send(message).map_err(|_| {
756                    AgentError::SendMessageFailed("Failed to send input message".to_string())
757                })?;
758            }
759            AgentMessageSender::Async(tx) => {
760                tx.send(message).await.map_err(|_| {
761                    AgentError::SendMessageFailed("Failed to send input message".to_string())
762                })?;
763            }
764        }
765        self.emit_agent_input(agent_id.to_string(), pin);
766
767        Ok(())
768    }
769
770    pub async fn send_agent_out(
771        &self,
772        agent_id: String,
773        ctx: AgentContext,
774        pin: String,
775        value: AgentValue,
776    ) -> Result<(), AgentError> {
777        message::send_agent_out(self, agent_id, ctx, pin, value).await
778    }
779
780    pub fn try_send_agent_out(
781        &self,
782        agent_id: String,
783        ctx: AgentContext,
784        pin: String,
785        value: AgentValue,
786    ) -> Result<(), AgentError> {
787        message::try_send_agent_out(self, agent_id, ctx, pin, value)
788    }
789
790    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
791        self.try_send_board_out(name, AgentContext::new(), value)
792    }
793
794    pub(crate) fn try_send_board_out(
795        &self,
796        name: String,
797        ctx: AgentContext,
798        value: AgentValue,
799    ) -> Result<(), AgentError> {
800        message::try_send_board_out(self, name, ctx, value)
801    }
802
803    fn spawn_message_loop(&self) -> Result<(), AgentError> {
804        // TODO: settings for the channel size
805        let (tx, mut rx) = mpsc::channel(4096);
806        {
807            let mut tx_lock = self.tx.lock().unwrap();
808            *tx_lock = Some(tx);
809        }
810
811        // spawn the main loop
812        let askit = self.clone();
813        tokio::spawn(async move {
814            while let Some(message) = rx.recv().await {
815                use AgentEventMessage::*;
816
817                match message {
818                    AgentOut {
819                        agent,
820                        ctx,
821                        pin,
822                        value,
823                    } => {
824                        message::agent_out(&askit, agent, ctx, pin, value).await;
825                    }
826                    BoardOut { name, ctx, value } => {
827                        message::board_out(&askit, name, ctx, value).await;
828                    }
829                }
830            }
831        });
832
833        Ok(())
834    }
835
836    async fn start_agent_flows(&self) -> Result<(), AgentError> {
837        let agent_flow_names;
838        {
839            let agent_flows = self.flows.lock().unwrap();
840            agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
841        }
842        for name in agent_flow_names {
843            self.start_agent_flow(&name).await.unwrap_or_else(|e| {
844                log::error!("Failed to start agent flow: {}", e);
845            });
846        }
847        Ok(())
848    }
849
850    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
851        let mut observers = self.observers.lock().unwrap();
852        let observer_id = new_observer_id();
853        observers.insert(observer_id, observer);
854        observer_id
855    }
856
857    pub fn unsubscribe(&self, observer_id: usize) {
858        let mut observers = self.observers.lock().unwrap();
859        observers.remove(&observer_id);
860    }
861
862    pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
863        self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
864    }
865
866    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
867        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
868    }
869
870    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
871        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
872    }
873
874    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
875        self.notify_observers(ASKitEvent::Board(name, value));
876    }
877
878    fn notify_observers(&self, event: ASKitEvent) {
879        let observers = self.observers.lock().unwrap();
880        for (_id, observer) in observers.iter() {
881            observer.notify(&event);
882        }
883    }
884}
885
886#[derive(Clone, Debug)]
887pub enum ASKitEvent {
888    AgentDisplay(String, String, AgentValue), // (agent_id, key, value)
889    AgentError(String, String),               // (agent_id, message)
890    AgentIn(String, String),                  // (agent_id, pin)
891    Board(String, AgentValue),                // (board name, value)
892}
893
894pub trait ASKitObserver {
895    fn notify(&self, event: &ASKitEvent);
896}
897
898static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
899
900fn new_observer_id() -> usize {
901    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
902}
903
904// Agent Message
905
906#[derive(Clone)]
907pub enum AgentMessageSender {
908    Sync(std::sync::mpsc::Sender<AgentMessage>),
909    Async(mpsc::Sender<AgentMessage>),
910}