agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::board_agent;
9use crate::config::{AgentConfig, AgentConfigs};
10use crate::context::AgentContext;
11use crate::data::AgentData;
12use crate::definition::{AgentDefaultConfig, AgentDefinition, AgentDefinitions};
13use crate::error::AgentError;
14use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
15use crate::message::{self, AgentEventMessage};
16
17#[derive(Clone)]
18pub struct ASKit {
19    // agent id -> agent
20    pub(crate) agents:
21        Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23    // agent id -> sender
24    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26    // board name -> [board out agent id]
27    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29    // board name -> data
30    pub(crate) board_data: Arc<Mutex<HashMap<String, AgentData>>>,
31
32    // sourece agent id -> [target agent id / source handle / target handle]
33    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35    // agent def name -> agent definition
36    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38    // agent flows
39    pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41    // agent def name -> config
42    pub(crate) global_configs: Arc<Mutex<HashMap<String, AgentConfig>>>,
43
44    // message sender
45    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47    // observers
48    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52    pub fn new() -> Self {
53        Self {
54            agents: Default::default(),
55            agent_txs: Default::default(),
56            board_out_agents: Default::default(),
57            board_data: Default::default(),
58            edges: Default::default(),
59            defs: Default::default(),
60            flows: Default::default(),
61            global_configs: Default::default(),
62            tx: Arc::new(Mutex::new(None)),
63            observers: Default::default(),
64        }
65    }
66
67    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68        self.tx
69            .lock()
70            .unwrap()
71            .clone()
72            .ok_or(AgentError::TxNotInitialized)
73    }
74
75    pub fn init() -> Result<Self, AgentError> {
76        let askit = Self::new();
77        askit.register_agents();
78        Ok(askit)
79    }
80
81    fn register_agents(&self) {
82        board_agent::register_agents(self);
83    }
84
85    pub async fn ready(&self) -> Result<(), AgentError> {
86        self.spawn_message_loop()?;
87        self.start_agent_flows().await?;
88        Ok(())
89    }
90
91    pub fn quit(&self) {
92        let mut tx_lock = self.tx.lock().unwrap();
93        *tx_lock = None;
94    }
95
96    pub fn register_agent(&self, def: AgentDefinition) {
97        let def_name = def.name.clone();
98        let def_global_config = def.global_config.clone();
99
100        let mut defs = self.defs.lock().unwrap();
101        defs.insert(def.name.clone(), def);
102
103        // if there is a global config, set it
104        if let Some(def_global_config) = def_global_config {
105            let mut new_config = AgentConfig::default();
106            for (key, config_entry) in def_global_config.iter() {
107                new_config.set(key.clone(), config_entry.value.clone());
108            }
109            self.set_global_config(def_name, new_config);
110        }
111    }
112
113    pub fn get_agent_definitions(&self) -> AgentDefinitions {
114        let defs = self.defs.lock().unwrap();
115        defs.clone()
116    }
117
118    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119        let defs = self.defs.lock().unwrap();
120        defs.get(def_name).cloned()
121    }
122
123    pub fn get_agent_default_config(&self, def_name: &str) -> Option<AgentDefaultConfig> {
124        let defs = self.defs.lock().unwrap();
125        let Some(def) = defs.get(def_name) else {
126            return None;
127        };
128        def.default_config.clone()
129    }
130
131    // // flow
132
133    pub fn get_agent_flows(&self) -> AgentFlows {
134        let flows = self.flows.lock().unwrap();
135        flows.clone()
136    }
137
138    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139        if !Self::is_valid_flow_name(name) {
140            return Err(AgentError::InvalidFlowName(name.into()));
141        }
142
143        let new_name = self.unique_flow_name(name);
144        let mut flows = self.flows.lock().unwrap();
145        let flow = AgentFlow::new(new_name.clone());
146        flows.insert(new_name, flow.clone());
147        Ok(flow)
148    }
149
150    pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
151        if !Self::is_valid_flow_name(new_name) {
152            return Err(AgentError::InvalidFlowName(new_name.into()));
153        }
154
155        // check if the new name is already used
156        let new_name = self.unique_flow_name(new_name);
157
158        let mut flows = self.flows.lock().unwrap();
159
160        // remove the original flow
161        let Some(mut flow) = flows.remove(old_name) else {
162            return Err(AgentError::RenameFlowFailed(old_name.into()));
163        };
164
165        // insert renamed flow
166        flow.set_name(new_name.clone());
167        flows.insert(new_name.clone(), flow);
168        Ok(new_name)
169    }
170
171    fn is_valid_flow_name(new_name: &str) -> bool {
172        // Check if the name is empty
173        if new_name.trim().is_empty() {
174            return false;
175        }
176
177        // Checks for path-like names:
178        if new_name.contains('/') {
179            // Disallow leading, trailing, or consecutive slashes
180            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181                return false;
182            }
183            // Disallow segments that are "." or ".."
184            if new_name
185                .split('/')
186                .any(|segment| segment == "." || segment == "..")
187            {
188                return false;
189            }
190        }
191
192        // Check if the name contains invalid characters
193        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194        for c in invalid_chars {
195            if new_name.contains(c) {
196                return false;
197            }
198        }
199
200        true
201    }
202
203    pub fn unique_flow_name(&self, name: &str) -> String {
204        let mut new_name = name.trim().to_string();
205        let mut i = 2;
206        let flows = self.flows.lock().unwrap();
207        while flows.contains_key(&new_name) {
208            new_name = format!("{}{}", name, i);
209            i += 1;
210        }
211        new_name
212    }
213
214    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215        let name = agent_flow.name();
216
217        // add the given flow into flows
218        {
219            let mut flows = self.flows.lock().unwrap();
220            if flows.contains_key(name) {
221                return Err(AgentError::DuplicateFlowName(name.into()));
222            }
223            flows.insert(name.into(), agent_flow.clone());
224        }
225
226        // add nodes into agents
227        for node in agent_flow.nodes().iter() {
228            self.add_agent(name, node).unwrap_or_else(|e| {
229                log::error!("Failed to add_agent_node {}: {}", node.id, e);
230            });
231        }
232
233        // add edges into edges
234        for edge in agent_flow.edges().iter() {
235            self.add_edge(edge).unwrap_or_else(|e| {
236                log::error!("Failed to add_edge {}: {}", edge.source, e);
237            });
238        }
239
240        Ok(())
241    }
242
243    pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
244        let flow = {
245            let mut flows = self.flows.lock().unwrap();
246            let Some(flow) = flows.remove(flow_name) else {
247                return Err(AgentError::FlowNotFound(flow_name.to_string()));
248            };
249            flow.clone()
250        };
251
252        flow.stop(self).await?;
253
254        // Remove all nodes and edges associated with the flow
255        for node in flow.nodes() {
256            self.remove_agent(&node.id).await?;
257        }
258        for edge in flow.edges() {
259            self.remove_edge(edge);
260        }
261
262        Ok(())
263    }
264
265    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266        let flow_name = flow.name();
267
268        let mut flows = self.flows.lock().unwrap();
269        flows.insert(flow_name.to_string(), flow);
270        Ok(())
271    }
272
273    pub fn new_agent_flow_node(
274        &self,
275        def_name: &str,
276    ) -> Result<AgentFlowNode, AgentError> {
277        let def = self.get_agent_definition(def_name).ok_or_else(|| {
278            AgentError::AgentDefinitionNotFound(def_name.to_string())
279        })?;
280        AgentFlowNode::new(&def)
281    }
282
283    pub fn add_agent_flow_node(
284        &self,
285        flow_name: &str,
286        node: &AgentFlowNode,
287    ) -> Result<(), AgentError> {
288        let mut flows = self.flows.lock().unwrap();
289        let Some(flow) = flows.get_mut(flow_name) else {
290            return Err(AgentError::FlowNotFound(flow_name.to_string()));
291        };
292        flow.add_node(node.clone());
293        self.add_agent(flow_name, node)?;
294        Ok(())
295    }
296
297    pub(crate) fn add_agent(
298        &self,
299        flow_name: &str,
300        node: &AgentFlowNode,
301    ) -> Result<(), AgentError> {
302        let mut agents = self.agents.lock().unwrap();
303        if agents.contains_key(&node.id) {
304            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
305        }
306        if let Ok(mut agent) = agent_new(
307            self.clone(),
308            node.id.clone(),
309            &node.def_name,
310            node.config.clone(),
311        ) {
312            agent.set_flow_name(flow_name.to_string());
313            agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
314        } else {
315            return Err(AgentError::AgentCreationFailed(node.id.to_string()));
316        }
317        Ok(())
318    }
319
320    pub fn add_agent_flow_edge(
321        &self,
322        flow_name: &str,
323        edge: &AgentFlowEdge,
324    ) -> Result<(), AgentError> {
325        let mut flows = self.flows.lock().unwrap();
326        let Some(flow) = flows.get_mut(flow_name) else {
327            return Err(AgentError::FlowNotFound(flow_name.to_string()));
328        };
329        flow.add_edge(edge.clone());
330        self.add_edge(edge)?;
331        Ok(())
332    }
333
334    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
335        // check if the source agent exists
336        {
337            let agents = self.agents.lock().unwrap();
338            if !agents.contains_key(&edge.source) {
339                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
340            }
341        }
342
343        // check if handles are valid
344        if edge.source_handle.is_empty() {
345            return Err(AgentError::EmptySourceHandle);
346        }
347        if edge.target_handle.is_empty() {
348            return Err(AgentError::EmptyTargetHandle);
349        }
350
351        let mut edges = self.edges.lock().unwrap();
352        if let Some(targets) = edges.get_mut(&edge.source) {
353            if targets
354                .iter()
355                .any(|(target, source_handle, target_handle)| {
356                    *target == edge.target
357                        && *source_handle == edge.source_handle
358                        && *target_handle == edge.target_handle
359                })
360            {
361                return Err(AgentError::EdgeAlreadyExists);
362            }
363            targets.push((
364                edge.target.clone(),
365                edge.source_handle.clone(),
366                edge.target_handle.clone(),
367            ));
368        } else {
369            edges.insert(
370                edge.source.clone(),
371                vec![(
372                    edge.target.clone(),
373                    edge.source_handle.clone(),
374                    edge.target_handle.clone(),
375                )],
376            );
377        }
378        Ok(())
379    }
380
381    pub async fn remove_agent_flow_node(
382        &self,
383        flow_name: &str,
384        node_id: &str,
385    ) -> Result<(), AgentError> {
386        {
387            let mut flows = self.flows.lock().unwrap();
388            let Some(flow) = flows.get_mut(flow_name) else {
389                return Err(AgentError::FlowNotFound(flow_name.to_string()));
390            };
391            flow.remove_node(node_id);
392        }
393        self.remove_agent(node_id).await?;
394        Ok(())
395    }
396
397    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
398        self.stop_agent(agent_id).await?;
399
400        // remove from edges
401        {
402            let mut edges = self.edges.lock().unwrap();
403            let mut sources_to_remove = Vec::new();
404            for (source, targets) in edges.iter_mut() {
405                targets.retain(|(target, _, _)| target != agent_id);
406                if targets.is_empty() {
407                    sources_to_remove.push(source.clone());
408                }
409            }
410            for source in sources_to_remove {
411                edges.remove(&source);
412            }
413            edges.remove(agent_id);
414        }
415
416        // remove from agents
417        {
418            let mut agents = self.agents.lock().unwrap();
419            agents.remove(agent_id);
420        }
421
422        Ok(())
423    }
424
425    pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
426        let mut flows = self.flows.lock().unwrap();
427        let Some(flow) = flows.get_mut(flow_name) else {
428            return Err(AgentError::FlowNotFound(flow_name.to_string()));
429        };
430        let Some(edge) = flow.remove_edge(edge_id) else {
431            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
432        };
433        self.remove_edge(&edge);
434        Ok(())
435    }
436
437    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
438        let mut edges = self.edges.lock().unwrap();
439        if let Some(targets) = edges.get_mut(&edge.source) {
440            targets.retain(|(target, source_handle, target_handle)| {
441                *target != edge.target
442                    || *source_handle != edge.source_handle
443                    || *target_handle != edge.target_handle
444            });
445            if targets.is_empty() {
446                edges.remove(&edge.source);
447            }
448        }
449    }
450
451    pub fn copy_sub_flow(
452        &self,
453        nodes: &Vec<AgentFlowNode>,
454        edges: &Vec<AgentFlowEdge>,
455    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
456        flow::copy_sub_flow(nodes, edges)
457    }
458
459    pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
460        let flow = {
461            let flows = self.flows.lock().unwrap();
462            let Some(flow) = flows.get(name) else {
463                return Err(AgentError::FlowNotFound(name.to_string()));
464            };
465            flow.clone()
466        };
467        flow.start(self).await?;
468        Ok(())
469    }
470
471    pub async fn stop_agent_flow(&self, name: &str) -> Result<(), AgentError> {
472        let flow = {
473            let flows = self.flows.lock().unwrap();
474            let Some(flow) = flows.get(name) else {
475                return Err(AgentError::FlowNotFound(name.to_string()));
476            };
477            flow.clone()
478        };
479        flow.stop(self).await?;
480        Ok(())
481    }
482
483    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
484        let agent = {
485            let agents = self.agents.lock().unwrap();
486            let Some(a) = agents.get(agent_id) else {
487                return Err(AgentError::AgentNotFound(agent_id.to_string()));
488            };
489            a.clone()
490        };
491        let def_name = {
492            let agent = agent.lock().await;
493            agent.def_name().to_string()
494        };
495        let uses_native_thread = {
496            let defs = self.defs.lock().unwrap();
497            let Some(def) = defs.get(&def_name) else {
498                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
499            };
500            def.native_thread
501        };
502        let agent_status = {
503            let agent = agent.lock().await;
504            agent.status().clone()
505        };
506        if agent_status == AgentStatus::Init {
507            log::info!("Starting agent {}", agent_id);
508
509            if uses_native_thread {
510                let (tx, rx) = std::sync::mpsc::channel();
511
512                {
513                    let mut agent_txs = self.agent_txs.lock().unwrap();
514                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
515                };
516
517                let agent_id = agent_id.to_string();
518                std::thread::spawn(async move || {
519                    if let Err(e) = agent.lock().await.start() {
520                        log::error!("Failed to start agent {}: {}", agent_id, e);
521                    }
522
523                    while let Ok(message) = rx.recv() {
524                        match message {
525                            AgentMessage::Input { ctx, data } => {
526                                agent
527                                    .lock()
528                                    .await
529                                    .process(ctx, data)
530                                    .await
531                                    .unwrap_or_else(|e| {
532                                        log::error!("Process Error {}: {}", agent_id, e);
533                                    });
534                            }
535                            AgentMessage::Config { config } => {
536                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
537                                    log::error!("Config Error {}: {}", agent_id, e);
538                                });
539                            }
540                            AgentMessage::Stop => {
541                                break;
542                            }
543                        }
544                    }
545                });
546            } else {
547                let (tx, mut rx) = mpsc::channel(32);
548
549                {
550                    let mut agent_txs = self.agent_txs.lock().unwrap();
551                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
552                };
553
554                let agent_id = agent_id.to_string();
555                tokio::spawn(async move {
556                    {
557                        let mut agent_guard = agent.lock().await;
558                        if let Err(e) = agent_guard.start() {
559                            log::error!("Failed to start agent {}: {}", agent_id, e);
560                        }
561                    }
562
563                    while let Some(message) = rx.recv().await {
564                        match message {
565                            AgentMessage::Input { ctx, data } => {
566                                agent
567                                    .lock()
568                                    .await
569                                    .process(ctx, data)
570                                    .await
571                                    .unwrap_or_else(|e| {
572                                        log::error!("Process Error {}: {}", agent_id, e);
573                                    });
574                            }
575                            AgentMessage::Config { config } => {
576                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
577                                    log::error!("Config Error {}: {}", agent_id, e);
578                                });
579                            }
580                            AgentMessage::Stop => {
581                                rx.close();
582                                return;
583                            }
584                        }
585                    }
586                });
587            }
588        }
589        Ok(())
590    }
591
592    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
593        let agent = {
594            let agents = self.agents.lock().unwrap();
595            let Some(a) = agents.get(agent_id) else {
596                return Err(AgentError::AgentNotFound(agent_id.to_string()));
597            };
598            a.clone()
599        };
600
601        let agent_status = {
602            let agent = agent.lock().await;
603            agent.status().clone()
604        };
605        if agent_status == AgentStatus::Start {
606            log::info!("Stopping agent {}", agent_id);
607
608            {
609                let mut agent_txs = self.agent_txs.lock().unwrap();
610                if let Some(tx) = agent_txs.remove(agent_id) {
611                    match tx {
612                        AgentMessageSender::Sync(tx) => {
613                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
614                                log::error!(
615                                    "Failed to send stop message to agent {}: {}",
616                                    agent_id,
617                                    e
618                                );
619                            });
620                        }
621                        AgentMessageSender::Async(tx) => {
622                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
623                                log::error!(
624                                    "Failed to send stop message to agent {}: {}",
625                                    agent_id,
626                                    e
627                                );
628                            });
629                        }
630                    }
631                }
632            }
633
634            agent.lock().await.stop()?;
635        }
636
637        Ok(())
638    }
639
640    pub async fn set_agent_config(
641        &self,
642        agent_id: String,
643        config: AgentConfig,
644    ) -> Result<(), AgentError> {
645        let agent = {
646            let agents = self.agents.lock().unwrap();
647            let Some(a) = agents.get(&agent_id) else {
648                return Err(AgentError::AgentNotFound(agent_id.to_string()));
649            };
650            a.clone()
651        };
652
653        let agent_status = {
654            let agent = agent.lock().await;
655            agent.status().clone()
656        };
657        if agent_status == AgentStatus::Init {
658            agent.lock().await.set_config(config.clone())?;
659        } else if agent_status == AgentStatus::Start {
660            let tx = {
661                let agent_txs = self.agent_txs.lock().unwrap();
662                let Some(tx) = agent_txs.get(&agent_id) else {
663                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
664                };
665                tx.clone()
666            };
667            let message = AgentMessage::Config { config };
668            match tx {
669                AgentMessageSender::Sync(tx) => {
670                    tx.send(message).map_err(|_| {
671                        AgentError::SendMessageFailed("Failed to send config message".to_string())
672                    })?;
673                }
674                AgentMessageSender::Async(tx) => {
675                    tx.send(message).await.map_err(|_| {
676                        AgentError::SendMessageFailed("Failed to send config message".to_string())
677                    })?;
678                }
679            }
680        }
681        Ok(())
682    }
683
684    pub fn get_global_config(&self, def_name: &str) -> Option<AgentConfig> {
685        let global_configs = self.global_configs.lock().unwrap();
686        global_configs.get(def_name).cloned()
687    }
688
689    pub fn set_global_config(&self, def_name: String, config: AgentConfig) {
690        let mut global_configs = self.global_configs.lock().unwrap();
691
692        let Some(existing_config) = global_configs.get_mut(&def_name) else {
693            global_configs.insert(def_name, config);
694            return;
695        };
696
697        for (key, value) in config {
698            existing_config.set(key, value);
699        }
700    }
701
702    pub fn set_global_configs(&self, new_configs: AgentConfigs) {
703        for (agent_name, new_config) in new_configs {
704            self.set_global_config(agent_name, new_config);
705        }
706    }
707
708    pub fn get_global_configs(&self) -> AgentConfigs {
709        let global_configs = self.global_configs.lock().unwrap();
710        global_configs.clone()
711    }
712
713    pub(crate) async fn agent_input(
714        &self,
715        agent_id: String,
716        ctx: AgentContext,
717        data: AgentData,
718    ) -> Result<(), AgentError> {
719        let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
720            let agents = self.agents.lock().unwrap();
721            let Some(a) = agents.get(&agent_id) else {
722                return Err(AgentError::AgentNotFound(agent_id.to_string()));
723            };
724            a.clone()
725        };
726
727        let agent_status = {
728            let agent = agent.lock().await;
729            agent.status().clone()
730        };
731        if agent_status == AgentStatus::Start {
732            let ch = ctx.port().to_string();
733            let message = AgentMessage::Input { ctx, data };
734
735            let tx = {
736                let agent_txs = self.agent_txs.lock().unwrap();
737                let Some(tx) = agent_txs.get(&agent_id) else {
738                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
739                };
740                tx.clone()
741            };
742            match tx {
743                AgentMessageSender::Sync(tx) => {
744                    tx.send(message).map_err(|_| {
745                        AgentError::SendMessageFailed("Failed to send input message".to_string())
746                    })?;
747                }
748                AgentMessageSender::Async(tx) => {
749                    tx.send(message).await.map_err(|_| {
750                        AgentError::SendMessageFailed("Failed to send input message".to_string())
751                    })?;
752                }
753            }
754
755            self.emit_input(agent_id.to_string(), ch);
756        }
757        Ok(())
758    }
759
760    pub async fn send_agent_out(
761        &self,
762        agent_id: String,
763        ctx: AgentContext,
764        data: AgentData,
765    ) -> Result<(), AgentError> {
766        message::send_agent_out(self, agent_id, ctx, data).await
767    }
768
769    pub fn try_send_agent_out(
770        &self,
771        agent_id: String,
772        ctx: AgentContext,
773        data: AgentData,
774    ) -> Result<(), AgentError> {
775        message::try_send_agent_out(self, agent_id, ctx, data)
776    }
777
778    pub fn write_board_data(&self, name: String, data: AgentData) -> Result<(), AgentError> {
779        self.try_send_board_out(name, AgentContext::new(), data)
780    }
781
782    pub(crate) fn try_send_board_out(
783        &self,
784        name: String,
785        ctx: AgentContext,
786        data: AgentData,
787    ) -> Result<(), AgentError> {
788        message::try_send_board_out(self, name, ctx, data)
789    }
790
791    fn spawn_message_loop(&self) -> Result<(), AgentError> {
792        // TODO: settings for the channel size
793        let (tx, mut rx) = mpsc::channel(4096);
794        {
795            let mut tx_lock = self.tx.lock().unwrap();
796            *tx_lock = Some(tx);
797        }
798
799        // spawn the main loop
800        let askit = self.clone();
801        tokio::spawn(async move {
802            while let Some(message) = rx.recv().await {
803                use AgentEventMessage::*;
804
805                match message {
806                    AgentOut { agent, ctx, data } => {
807                        message::agent_out(&askit, agent, ctx, data).await;
808                    }
809                    BoardOut { name, ctx, data } => {
810                        message::board_out(&askit, name, ctx, data).await;
811                    }
812                }
813            }
814        });
815
816        Ok(())
817    }
818
819    async fn start_agent_flows(&self) -> Result<(), AgentError> {
820        let agent_flow_names;
821        {
822            let agent_flows = self.flows.lock().unwrap();
823            agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
824        }
825        for name in agent_flow_names {
826            self.start_agent_flow(&name).await.unwrap_or_else(|e| {
827                log::error!("Failed to start agent flow: {}", e);
828            });
829        }
830        Ok(())
831    }
832
833    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
834        let mut observers = self.observers.lock().unwrap();
835        let observer_id = new_observer_id();
836        observers.insert(observer_id, observer);
837        observer_id
838    }
839
840    pub fn unsubscribe(&self, observer_id: usize) {
841        let mut observers = self.observers.lock().unwrap();
842        observers.remove(&observer_id);
843    }
844
845    pub(crate) fn emit_error(&self, agent_id: String, message: String) {
846        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
847    }
848
849    pub(crate) fn emit_input(&self, agent_id: String, ch: String) {
850        self.notify_observers(ASKitEvent::AgentIn(agent_id, ch));
851    }
852
853    pub(crate) fn emit_display(&self, agent_id: String, key: String, data: AgentData) {
854        self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, data));
855    }
856
857    pub(crate) fn emit_board(&self, name: String, data: AgentData) {
858        self.notify_observers(ASKitEvent::Board(name, data));
859    }
860
861    fn notify_observers(&self, event: ASKitEvent) {
862        let observers = self.observers.lock().unwrap();
863        for (_id, observer) in observers.iter() {
864            observer.notify(&event);
865        }
866    }
867}
868
869#[derive(Clone, Debug)]
870pub enum ASKitEvent {
871    AgentIn(String, String),                 // (agent_id, channel)
872    AgentDisplay(String, String, AgentData), // (agent_id, key, data)
873    AgentError(String, String),              // (agent_id, message)
874    Board(String, AgentData),                // (board name, data)
875}
876
877pub trait ASKitObserver {
878    fn notify(&self, event: &ASKitEvent);
879}
880
881static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
882
883fn new_observer_id() -> usize {
884    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
885}
886
887// Agent Message
888
889#[derive(Clone)]
890pub enum AgentMessageSender {
891    Sync(std::sync::mpsc::Sender<AgentMessage>),
892    Async(mpsc::Sender<AgentMessage>),
893}