agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentDefaultConfigs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::value::AgentValue;
16
17const MESSAGE_LIMIT: usize = 1024;
18
19#[derive(Clone)]
20pub struct ASKit {
21    // agent id -> agent
22    pub(crate) agents: Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
23
24    // agent id -> sender
25    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
26
27    // board name -> [board out agent id]
28    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
29
30    // board name -> value
31    pub(crate) board_value: Arc<Mutex<HashMap<String, AgentValue>>>,
32
33    // source agent id -> [target agent id / source handle / target handle]
34    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
35
36    // agent def name -> agent definition
37    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39    // agent flows (flow id -> flow)
40    pub(crate) flows: Arc<Mutex<AgentFlows>>,
41
42    // agent def name -> config
43    pub(crate) global_configs_map: Arc<Mutex<HashMap<String, AgentConfigs>>>,
44
45    // message sender
46    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48    // observers
49    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53    pub fn new() -> Self {
54        Self {
55            agents: Default::default(),
56            agent_txs: Default::default(),
57            board_out_agents: Default::default(),
58            board_value: Default::default(),
59            edges: Default::default(),
60            defs: Default::default(),
61            flows: Default::default(),
62            global_configs_map: Default::default(),
63            tx: Arc::new(Mutex::new(None)),
64            observers: Default::default(),
65        }
66    }
67
68    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69        self.tx
70            .lock()
71            .unwrap()
72            .clone()
73            .ok_or(AgentError::TxNotInitialized)
74    }
75
76    pub fn init() -> Result<Self, AgentError> {
77        let askit = Self::new();
78        askit.register_agents();
79        Ok(askit)
80    }
81
82    fn register_agents(&self) {
83        registry::register_inventory_agents(self);
84    }
85
86    pub async fn ready(&self) -> Result<(), AgentError> {
87        self.spawn_message_loop().await?;
88        self.start_agent_flows().await?;
89        Ok(())
90    }
91
92    pub fn quit(&self) {
93        let mut tx_lock = self.tx.lock().unwrap();
94        *tx_lock = None;
95    }
96
97    pub fn register_agent(&self, def: AgentDefinition) {
98        let def_name = def.name.clone();
99        let def_global_configs = def.global_configs.clone();
100
101        let mut defs = self.defs.lock().unwrap();
102        defs.insert(def.name.clone(), def);
103
104        // if there is a global config, set it
105        if let Some(def_global_configs) = def_global_configs {
106            let mut new_configs = AgentConfigs::default();
107            for (key, config_entry) in def_global_configs.iter() {
108                new_configs.set(key.clone(), config_entry.value.clone());
109            }
110            self.set_global_configs(def_name, new_configs);
111        }
112    }
113
114    pub fn get_agent_definitions(&self) -> AgentDefinitions {
115        let defs = self.defs.lock().unwrap();
116        defs.clone()
117    }
118
119    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
120        let defs = self.defs.lock().unwrap();
121        defs.get(def_name).cloned()
122    }
123
124    pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentDefaultConfigs> {
125        let defs = self.defs.lock().unwrap();
126        let Some(def) = defs.get(def_name) else {
127            return None;
128        };
129        def.default_configs.clone()
130    }
131
132    // flows
133
134    pub fn get_agent_flows(&self) -> AgentFlows {
135        let flows = self.flows.lock().unwrap();
136        flows.clone()
137    }
138
139    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
140        if !Self::is_valid_flow_name(name) {
141            return Err(AgentError::InvalidFlowName(name.into()));
142        }
143
144        let new_name = self.unique_flow_name(name);
145        let mut flows = self.flows.lock().unwrap();
146        let flow = AgentFlow::new(new_name.clone());
147        flows.insert(flow.id().to_string(), flow.clone());
148        Ok(flow)
149    }
150
151    pub fn rename_agent_flow(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
152        if !Self::is_valid_flow_name(new_name) {
153            return Err(AgentError::InvalidFlowName(new_name.into()));
154        }
155
156        // check if the new name is already used
157        let new_name = self.unique_flow_name(new_name);
158
159        let mut flows = self.flows.lock().unwrap();
160
161        // remove the original flow
162        let Some(mut flow) = flows.remove(id) else {
163            return Err(AgentError::RenameFlowFailed(id.into()));
164        };
165
166        // insert renamed flow
167        flow.set_name(new_name.clone());
168        flows.insert(flow.id().to_string(), flow);
169        Ok(new_name)
170    }
171
172    fn is_valid_flow_name(new_name: &str) -> bool {
173        // Check if the name is empty
174        if new_name.trim().is_empty() {
175            return false;
176        }
177
178        // Checks for path-like names:
179        if new_name.contains('/') {
180            // Disallow leading, trailing, or consecutive slashes
181            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
182                return false;
183            }
184            // Disallow segments that are "." or ".."
185            if new_name
186                .split('/')
187                .any(|segment| segment == "." || segment == "..")
188            {
189                return false;
190            }
191        }
192
193        // Check if the name contains invalid characters
194        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
195        for c in invalid_chars {
196            if new_name.contains(c) {
197                return false;
198            }
199        }
200
201        true
202    }
203
204    pub fn unique_flow_name(&self, name: &str) -> String {
205        let mut new_name = name.trim().to_string();
206        let mut i = 2;
207        let flows = self.flows.lock().unwrap();
208        while flows.values().any(|flow| flow.name() == new_name) {
209            new_name = format!("{}{}", name, i);
210            i += 1;
211        }
212        new_name
213    }
214
215    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
216        let id = agent_flow.id();
217
218        // add the given flow into flows
219        {
220            let mut flows = self.flows.lock().unwrap();
221            if flows.contains_key(id) {
222                return Err(AgentError::DuplicateId(id.into()));
223            }
224            flows.insert(id.to_string(), agent_flow.clone());
225        }
226
227        // add nodes into agents
228        for node in agent_flow.nodes().iter() {
229            if let Err(e) = self.add_agent(id, node) {
230                log::error!("Failed to add_agent_node {}: {}", node.id, e);
231            }
232        }
233
234        // add edges into edges
235        for edge in agent_flow.edges().iter() {
236            self.add_edge(edge).unwrap_or_else(|e| {
237                log::error!("Failed to add_edge {}: {}", edge.source, e);
238            });
239        }
240
241        Ok(())
242    }
243
244    pub async fn remove_agent_flow(&self, id: &str) -> Result<(), AgentError> {
245        let flow = {
246            let mut flows = self.flows.lock().unwrap();
247            let Some(flow) = flows.remove(id) else {
248                return Err(AgentError::FlowNotFound(id.to_string()));
249            };
250            flow.clone()
251        };
252
253        flow.stop(self).await?;
254
255        // Remove all nodes and edges associated with the flow
256        for node in flow.nodes() {
257            self.remove_agent(&node.id).await?;
258        }
259        for edge in flow.edges() {
260            self.remove_edge(edge);
261        }
262
263        Ok(())
264    }
265
266    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
267        let flow_id = flow.id();
268
269        let mut flows = self.flows.lock().unwrap();
270        flows.insert(flow_id.to_string(), flow);
271        Ok(())
272    }
273
274    pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
275        let def = self
276            .get_agent_definition(def_name)
277            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
278        AgentFlowNode::new(&def)
279    }
280
281    pub fn add_agent_flow_node(
282        &self,
283        flow_id: &str,
284        node: &AgentFlowNode,
285    ) -> Result<(), AgentError> {
286        let mut flows = self.flows.lock().unwrap();
287        let Some(flow) = flows.get_mut(flow_id) else {
288            return Err(AgentError::FlowNotFound(flow_id.to_string()));
289        };
290        flow.add_node(node.clone());
291        self.add_agent(flow_id, node)
292    }
293
294    pub(crate) fn add_agent(&self, flow_id: &str, node: &AgentFlowNode) -> Result<(), AgentError> {
295        let mut agents = self.agents.lock().unwrap();
296        if agents.contains_key(&node.id) {
297            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
298        }
299        let mut agent = agent_new(
300            self.clone(),
301            node.id.clone(),
302            &node.def_name,
303            node.configs.clone(),
304        )?;
305        agent.set_flow_id(flow_id.to_string());
306        agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
307        Ok(())
308    }
309
310    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
311        let agents = self.agents.lock().unwrap();
312        agents.get(agent_id).cloned()
313    }
314
315    pub fn add_agent_flow_edge(
316        &self,
317        flow_id: &str,
318        edge: &AgentFlowEdge,
319    ) -> Result<(), AgentError> {
320        let mut flows = self.flows.lock().unwrap();
321        let Some(flow) = flows.get_mut(flow_id) else {
322            return Err(AgentError::FlowNotFound(flow_id.to_string()));
323        };
324        flow.add_edge(edge.clone());
325        self.add_edge(edge)?;
326        Ok(())
327    }
328
329    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
330        // check if the source agent exists
331        {
332            let agents = self.agents.lock().unwrap();
333            if !agents.contains_key(&edge.source) {
334                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
335            }
336        }
337
338        // check if handles are valid
339        if edge.source_handle.is_empty() {
340            return Err(AgentError::EmptySourceHandle);
341        }
342        if edge.target_handle.is_empty() {
343            return Err(AgentError::EmptyTargetHandle);
344        }
345
346        let mut edges = self.edges.lock().unwrap();
347        if let Some(targets) = edges.get_mut(&edge.source) {
348            if targets
349                .iter()
350                .any(|(target, source_handle, target_handle)| {
351                    *target == edge.target
352                        && *source_handle == edge.source_handle
353                        && *target_handle == edge.target_handle
354                })
355            {
356                return Err(AgentError::EdgeAlreadyExists);
357            }
358            targets.push((
359                edge.target.clone(),
360                edge.source_handle.clone(),
361                edge.target_handle.clone(),
362            ));
363        } else {
364            edges.insert(
365                edge.source.clone(),
366                vec![(
367                    edge.target.clone(),
368                    edge.source_handle.clone(),
369                    edge.target_handle.clone(),
370                )],
371            );
372        }
373        Ok(())
374    }
375
376    pub async fn remove_agent_flow_node(
377        &self,
378        flow_id: &str,
379        node_id: &str,
380    ) -> Result<(), AgentError> {
381        {
382            let mut flows = self.flows.lock().unwrap();
383            let Some(flow) = flows.get_mut(flow_id) else {
384                return Err(AgentError::FlowNotFound(flow_id.to_string()));
385            };
386            flow.remove_node(node_id);
387        }
388        self.remove_agent(node_id).await?;
389        Ok(())
390    }
391
392    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
393        self.stop_agent(agent_id).await?;
394
395        // remove from edges
396        {
397            let mut edges = self.edges.lock().unwrap();
398            let mut sources_to_remove = Vec::new();
399            for (source, targets) in edges.iter_mut() {
400                targets.retain(|(target, _, _)| target != agent_id);
401                if targets.is_empty() {
402                    sources_to_remove.push(source.clone());
403                }
404            }
405            for source in sources_to_remove {
406                edges.remove(&source);
407            }
408            edges.remove(agent_id);
409        }
410
411        // remove from agents
412        {
413            let mut agents = self.agents.lock().unwrap();
414            agents.remove(agent_id);
415        }
416
417        Ok(())
418    }
419
420    pub fn remove_agent_flow_edge(&self, flow_id: &str, edge_id: &str) -> Result<(), AgentError> {
421        let mut flows = self.flows.lock().unwrap();
422        let Some(flow) = flows.get_mut(flow_id) else {
423            return Err(AgentError::FlowNotFound(flow_id.to_string()));
424        };
425        let Some(edge) = flow.remove_edge(edge_id) else {
426            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
427        };
428        self.remove_edge(&edge);
429        Ok(())
430    }
431
432    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
433        let mut edges = self.edges.lock().unwrap();
434        if let Some(targets) = edges.get_mut(&edge.source) {
435            targets.retain(|(target, source_handle, target_handle)| {
436                *target != edge.target
437                    || *source_handle != edge.source_handle
438                    || *target_handle != edge.target_handle
439            });
440            if targets.is_empty() {
441                edges.remove(&edge.source);
442            }
443        }
444    }
445
446    pub fn copy_sub_flow(
447        &self,
448        nodes: &Vec<AgentFlowNode>,
449        edges: &Vec<AgentFlowEdge>,
450    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
451        flow::copy_sub_flow(nodes, edges)
452    }
453
454    pub async fn start_agent_flow(&self, id: &str) -> Result<(), AgentError> {
455        let flow = {
456            let flows = self.flows.lock().unwrap();
457            let Some(flow) = flows.get(id) else {
458                return Err(AgentError::FlowNotFound(id.to_string()));
459            };
460            flow.clone()
461        };
462        flow.start(self).await?;
463        Ok(())
464    }
465
466    pub async fn stop_agent_flow(&self, id: &str) -> Result<(), AgentError> {
467        let flow = {
468            let flows = self.flows.lock().unwrap();
469            let Some(flow) = flows.get(id) else {
470                return Err(AgentError::FlowNotFound(id.to_string()));
471            };
472            flow.clone()
473        };
474        flow.stop(self).await?;
475        Ok(())
476    }
477
478    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
479        let agent = {
480            let agents = self.agents.lock().unwrap();
481            let Some(a) = agents.get(agent_id) else {
482                return Err(AgentError::AgentNotFound(agent_id.to_string()));
483            };
484            a.clone()
485        };
486        let def_name = {
487            let agent = agent.lock().await;
488            agent.def_name().to_string()
489        };
490        let uses_native_thread = {
491            let defs = self.defs.lock().unwrap();
492            let Some(def) = defs.get(&def_name) else {
493                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
494            };
495            def.native_thread
496        };
497        let agent_status = {
498            let agent = agent.lock().await;
499            agent.status().clone()
500        };
501        if agent_status == AgentStatus::Init {
502            log::info!("Starting agent {}", agent_id);
503
504            if uses_native_thread {
505                let (tx, rx) = std::sync::mpsc::channel();
506
507                {
508                    let mut agent_txs = self.agent_txs.lock().unwrap();
509                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
510                };
511
512                let agent_id = agent_id.to_string();
513                std::thread::spawn(async move || {
514                    if let Err(e) = agent.lock().await.start().await {
515                        log::error!("Failed to start agent {}: {}", agent_id, e);
516                    }
517
518                    while let Ok(message) = rx.recv() {
519                        match message {
520                            AgentMessage::Input { ctx, pin, value } => {
521                                agent
522                                    .lock()
523                                    .await
524                                    .process(ctx, pin, value)
525                                    .await
526                                    .unwrap_or_else(|e| {
527                                        log::error!("Process Error {}: {}", agent_id, e);
528                                    });
529                            }
530                            AgentMessage::Config { configs } => {
531                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
532                                    log::error!("Config Error {}: {}", agent_id, e);
533                                });
534                            }
535                            AgentMessage::Stop => {
536                                break;
537                            }
538                        }
539                    }
540                });
541            } else {
542                let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
543
544                {
545                    let mut agent_txs = self.agent_txs.lock().unwrap();
546                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
547                };
548
549                let agent_id = agent_id.to_string();
550                tokio::spawn(async move {
551                    {
552                        let mut agent_guard = agent.lock().await;
553                        if let Err(e) = agent_guard.start().await {
554                            log::error!("Failed to start agent {}: {}", agent_id, e);
555                        }
556                    }
557
558                    while let Some(message) = rx.recv().await {
559                        match message {
560                            AgentMessage::Input { ctx, pin, value } => {
561                                agent
562                                    .lock()
563                                    .await
564                                    .process(ctx, pin, value)
565                                    .await
566                                    .unwrap_or_else(|e| {
567                                        log::error!("Process Error {}: {}", agent_id, e);
568                                    });
569                            }
570                            AgentMessage::Config { configs } => {
571                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
572                                    log::error!("Config Error {}: {}", agent_id, e);
573                                });
574                            }
575                            AgentMessage::Stop => {
576                                rx.close();
577                                return;
578                            }
579                        }
580                    }
581                });
582                tokio::task::yield_now().await;
583            }
584        }
585        Ok(())
586    }
587
588    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
589        let agent = {
590            let agents = self.agents.lock().unwrap();
591            let Some(a) = agents.get(agent_id) else {
592                return Err(AgentError::AgentNotFound(agent_id.to_string()));
593            };
594            a.clone()
595        };
596
597        let agent_status = {
598            let agent = agent.lock().await;
599            agent.status().clone()
600        };
601        if agent_status == AgentStatus::Start {
602            log::info!("Stopping agent {}", agent_id);
603
604            {
605                let mut agent_txs = self.agent_txs.lock().unwrap();
606                if let Some(tx) = agent_txs.remove(agent_id) {
607                    match tx {
608                        AgentMessageSender::Sync(tx) => {
609                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
610                                log::error!(
611                                    "Failed to send stop message to agent {}: {}",
612                                    agent_id,
613                                    e
614                                );
615                            });
616                        }
617                        AgentMessageSender::Async(tx) => {
618                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
619                                log::error!(
620                                    "Failed to send stop message to agent {}: {}",
621                                    agent_id,
622                                    e
623                                );
624                            });
625                        }
626                    }
627                }
628            }
629
630            agent.lock().await.stop().await?;
631        }
632
633        Ok(())
634    }
635
636    pub async fn set_agent_configs(
637        &self,
638        agent_id: String,
639        configs: AgentConfigs,
640    ) -> Result<(), AgentError> {
641        let agent = {
642            let agents = self.agents.lock().unwrap();
643            let Some(a) = agents.get(&agent_id) else {
644                return Err(AgentError::AgentNotFound(agent_id.to_string()));
645            };
646            a.clone()
647        };
648
649        let agent_status = {
650            let agent = agent.lock().await;
651            agent.status().clone()
652        };
653        if agent_status == AgentStatus::Init {
654            agent.lock().await.set_configs(configs.clone())?;
655        } else if agent_status == AgentStatus::Start {
656            let tx = {
657                let agent_txs = self.agent_txs.lock().unwrap();
658                let Some(tx) = agent_txs.get(&agent_id) else {
659                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
660                };
661                tx.clone()
662            };
663            let message = AgentMessage::Config { configs };
664            match tx {
665                AgentMessageSender::Sync(tx) => {
666                    tx.send(message).map_err(|_| {
667                        AgentError::SendMessageFailed("Failed to send config message".to_string())
668                    })?;
669                }
670                AgentMessageSender::Async(tx) => {
671                    tx.send(message).await.map_err(|_| {
672                        AgentError::SendMessageFailed("Failed to send config message".to_string())
673                    })?;
674                }
675            }
676        }
677        Ok(())
678    }
679
680    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
681        let global_configs_map = self.global_configs_map.lock().unwrap();
682        global_configs_map.get(def_name).cloned()
683    }
684
685    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
686        let mut global_configs_map = self.global_configs_map.lock().unwrap();
687
688        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
689            global_configs_map.insert(def_name, configs);
690            return;
691        };
692
693        for (key, value) in configs {
694            existing_configs.set(key, value);
695        }
696    }
697
698    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
699        let global_configs_map = self.global_configs_map.lock().unwrap();
700        global_configs_map.clone()
701    }
702
703    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
704        for (agent_name, new_configs) in new_configs_map {
705            self.set_global_configs(agent_name, new_configs);
706        }
707    }
708
709    pub async fn agent_input(
710        &self,
711        agent_id: String,
712        ctx: AgentContext,
713        pin: String,
714        value: AgentValue,
715    ) -> Result<(), AgentError> {
716        let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
717            let agents = self.agents.lock().unwrap();
718            let Some(a) = agents.get(&agent_id) else {
719                return Err(AgentError::AgentNotFound(agent_id.to_string()));
720            };
721            a.clone()
722        };
723
724        let agent_status = {
725            let agent = agent.lock().await;
726            agent.status().clone()
727        };
728        if agent_status != AgentStatus::Start {
729            return Ok(());
730        }
731
732        if pin.starts_with("config:") {
733            let config_key = pin[7..].to_string();
734            let mut agent = agent.lock().await;
735            agent.set_config(config_key.clone(), value.clone())?;
736            return Ok(());
737        }
738
739        let message = AgentMessage::Input {
740            ctx,
741            pin: pin.clone(),
742            value,
743        };
744
745        let tx = {
746            let agent_txs = self.agent_txs.lock().unwrap();
747            let Some(tx) = agent_txs.get(&agent_id) else {
748                return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
749            };
750            tx.clone()
751        };
752        match tx {
753            AgentMessageSender::Sync(tx) => {
754                tx.send(message).map_err(|_| {
755                    AgentError::SendMessageFailed("Failed to send input message".to_string())
756                })?;
757            }
758            AgentMessageSender::Async(tx) => {
759                tx.send(message).await.map_err(|_| {
760                    AgentError::SendMessageFailed("Failed to send input message".to_string())
761                })?;
762            }
763        }
764
765        self.emit_agent_input(agent_id.to_string(), pin);
766
767        Ok(())
768    }
769
770    pub async fn send_agent_out(
771        &self,
772        agent_id: String,
773        ctx: AgentContext,
774        pin: String,
775        value: AgentValue,
776    ) -> Result<(), AgentError> {
777        message::send_agent_out(self, agent_id, ctx, pin, value).await
778    }
779
780    pub fn try_send_agent_out(
781        &self,
782        agent_id: String,
783        ctx: AgentContext,
784        pin: String,
785        value: AgentValue,
786    ) -> Result<(), AgentError> {
787        message::try_send_agent_out(self, agent_id, ctx, pin, value)
788    }
789
790    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
791        self.try_send_board_out(name, AgentContext::new(), value)
792    }
793
794    pub(crate) fn try_send_board_out(
795        &self,
796        name: String,
797        ctx: AgentContext,
798        value: AgentValue,
799    ) -> Result<(), AgentError> {
800        message::try_send_board_out(self, name, ctx, value)
801    }
802
803    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
804        // TODO: settings for the channel size
805        let (tx, mut rx) = mpsc::channel(4096);
806        {
807            let mut tx_lock = self.tx.lock().unwrap();
808            *tx_lock = Some(tx);
809        }
810
811        // spawn the main loop
812        let askit = self.clone();
813        tokio::spawn(async move {
814            while let Some(message) = rx.recv().await {
815                use AgentEventMessage::*;
816
817                match message {
818                    AgentOut {
819                        agent,
820                        ctx,
821                        pin,
822                        value,
823                    } => {
824                        message::agent_out(&askit, agent, ctx, pin, value).await;
825                    }
826                    BoardOut { name, ctx, value } => {
827                        message::board_out(&askit, name, ctx, value).await;
828                    }
829                }
830            }
831        });
832
833        tokio::task::yield_now().await;
834
835        Ok(())
836    }
837
838    async fn start_agent_flows(&self) -> Result<(), AgentError> {
839        let agent_flow_ids;
840        {
841            let agent_flows = self.flows.lock().unwrap();
842            agent_flow_ids = agent_flows.keys().cloned().collect::<Vec<_>>();
843        }
844        for id in agent_flow_ids {
845            self.start_agent_flow(&id).await.unwrap_or_else(|e| {
846                log::error!("Failed to start agent flow: {}", e);
847            });
848        }
849        Ok(())
850    }
851
852    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
853        let mut observers = self.observers.lock().unwrap();
854        let observer_id = new_observer_id();
855        observers.insert(observer_id, observer);
856        observer_id
857    }
858
859    pub fn unsubscribe(&self, observer_id: usize) {
860        let mut observers = self.observers.lock().unwrap();
861        observers.remove(&observer_id);
862    }
863
864    pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
865        self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
866    }
867
868    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
869        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
870    }
871
872    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
873        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
874    }
875
876    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
877        // ignore variables
878        if name.starts_with('%') {
879            return;
880        }
881        self.notify_observers(ASKitEvent::Board(name, value));
882    }
883
884    fn notify_observers(&self, event: ASKitEvent) {
885        let observers = self.observers.lock().unwrap();
886        for (_id, observer) in observers.iter() {
887            observer.notify(&event);
888        }
889    }
890}
891
892#[derive(Clone, Debug)]
893pub enum ASKitEvent {
894    AgentDisplay(String, String, AgentValue), // (agent_id, key, value)
895    AgentError(String, String),               // (agent_id, message)
896    AgentIn(String, String),                  // (agent_id, pin)
897    Board(String, AgentValue),                // (board name, value)
898}
899
900pub trait ASKitObserver {
901    fn notify(&self, event: &ASKitEvent);
902}
903
904static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
905
906fn new_observer_id() -> usize {
907    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
908}
909
910// Agent Message
911
912#[derive(Clone)]
913pub enum AgentMessageSender {
914    Sync(std::sync::mpsc::Sender<AgentMessage>),
915    Async(mpsc::Sender<AgentMessage>),
916}