agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::board_agent;
9use crate::config::{AgentConfig, AgentConfigs};
10use crate::context::AgentContext;
11use crate::data::AgentData;
12use crate::definition::{AgentDefaultConfig, AgentDefinition, AgentDefinitions};
13use crate::error::AgentError;
14use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
15use crate::message::{self, AgentEventMessage};
16
17#[derive(Clone)]
18pub struct ASKit {
19    // agent id -> agent
20    pub(crate) agents:
21        Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23    // agent id -> sender
24    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26    // board name -> [board out agent id]
27    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29    // board name -> data
30    pub(crate) board_data: Arc<Mutex<HashMap<String, AgentData>>>,
31
32    // sourece agent id -> [target agent id / source handle / target handle]
33    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35    // agent def name -> agent definition
36    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38    // agent flows
39    pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41    // agent def name -> config
42    pub(crate) global_configs: Arc<Mutex<HashMap<String, AgentConfig>>>,
43
44    // message sender
45    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47    // observers
48    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52    pub fn new() -> Self {
53        Self {
54            agents: Default::default(),
55            agent_txs: Default::default(),
56            board_out_agents: Default::default(),
57            board_data: Default::default(),
58            edges: Default::default(),
59            defs: Default::default(),
60            flows: Default::default(),
61            global_configs: Default::default(),
62            tx: Arc::new(Mutex::new(None)),
63            observers: Default::default(),
64        }
65    }
66
67    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68        self.tx
69            .lock()
70            .unwrap()
71            .clone()
72            .ok_or(AgentError::TxNotInitialized)
73    }
74
75    pub fn init() -> Result<Self, AgentError> {
76        let askit = Self::new();
77        askit.register_agents();
78        Ok(askit)
79    }
80
81    fn register_agents(&self) {
82        board_agent::register_agents(self);
83    }
84
85    pub async fn ready(&self) -> Result<(), AgentError> {
86        self.spawn_message_loop()?;
87        self.start_agent_flows().await?;
88        Ok(())
89    }
90
91    pub fn quit(&self) {
92        let mut tx_lock = self.tx.lock().unwrap();
93        *tx_lock = None;
94    }
95
96    pub fn register_agent(&self, def: AgentDefinition) {
97        let def_name = def.name.clone();
98        let def_global_config = def.global_config.clone();
99
100        let mut defs = self.defs.lock().unwrap();
101        defs.insert(def.name.clone(), def);
102
103        // if there is a global config, set it
104        if let Some(def_global_config) = def_global_config {
105            let mut new_config = AgentConfig::default();
106            for (key, config_entry) in def_global_config.iter() {
107                new_config.set(key.clone(), config_entry.value.clone());
108            }
109            self.set_global_config(def_name, new_config);
110        }
111    }
112
113    pub fn get_agent_definitions(&self) -> AgentDefinitions {
114        let defs = self.defs.lock().unwrap();
115        defs.clone()
116    }
117
118    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119        let defs = self.defs.lock().unwrap();
120        defs.get(def_name).cloned()
121    }
122
123    pub fn get_agent_default_config(&self, def_name: &str) -> Option<AgentDefaultConfig> {
124        let defs = self.defs.lock().unwrap();
125        let Some(def) = defs.get(def_name) else {
126            return None;
127        };
128        def.default_config.clone()
129    }
130
131    // // flow
132
133    pub fn get_agent_flows(&self) -> AgentFlows {
134        let flows = self.flows.lock().unwrap();
135        flows.clone()
136    }
137
138    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139        if !Self::is_valid_flow_name(name) {
140            return Err(AgentError::InvalidFlowName(name.into()));
141        }
142
143        let new_name = self.unique_flow_name(name);
144        let mut flows = self.flows.lock().unwrap();
145        let flow = AgentFlow::new(new_name.clone());
146        flows.insert(new_name, flow.clone());
147        Ok(flow)
148    }
149
150    pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
151        if !Self::is_valid_flow_name(new_name) {
152            return Err(AgentError::InvalidFlowName(new_name.into()));
153        }
154
155        // check if the new name is already used
156        let new_name = self.unique_flow_name(new_name);
157
158        let mut flows = self.flows.lock().unwrap();
159
160        // remove the original flow
161        let Some(mut flow) = flows.remove(old_name) else {
162            return Err(AgentError::RenameFlowFailed(old_name.into()));
163        };
164
165        // insert renamed flow
166        flow.set_name(new_name.clone());
167        flows.insert(new_name.clone(), flow);
168        Ok(new_name)
169    }
170
171    fn is_valid_flow_name(new_name: &str) -> bool {
172        // Check if the name is empty
173        if new_name.trim().is_empty() {
174            return false;
175        }
176
177        // Checks for path-like names:
178        if new_name.contains('/') {
179            // Disallow leading, trailing, or consecutive slashes
180            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181                return false;
182            }
183            // Disallow segments that are "." or ".."
184            if new_name
185                .split('/')
186                .any(|segment| segment == "." || segment == "..")
187            {
188                return false;
189            }
190        }
191
192        // Check if the name contains invalid characters
193        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194        for c in invalid_chars {
195            if new_name.contains(c) {
196                return false;
197            }
198        }
199
200        true
201    }
202
203    pub fn unique_flow_name(&self, name: &str) -> String {
204        let mut new_name = name.trim().to_string();
205        let mut i = 2;
206        let flows = self.flows.lock().unwrap();
207        while flows.contains_key(&new_name) {
208            new_name = format!("{}{}", name, i);
209            i += 1;
210        }
211        new_name
212    }
213
214    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215        let name = agent_flow.name();
216
217        // add the given flow into flows
218        {
219            let mut flows = self.flows.lock().unwrap();
220            if flows.contains_key(name) {
221                return Err(AgentError::DuplicateFlowName(name.into()));
222            }
223            flows.insert(name.into(), agent_flow.clone());
224        }
225
226        // add nodes into agents
227        for node in agent_flow.nodes().iter() {
228            self.add_agent(name, node).unwrap_or_else(|e| {
229                log::error!("Failed to add_agent_node {}: {}", node.id, e);
230            });
231        }
232
233        // add edges into edges
234        for edge in agent_flow.edges().iter() {
235            self.add_edge(edge).unwrap_or_else(|e| {
236                log::error!("Failed to add_edge {}: {}", edge.source, e);
237            });
238        }
239
240        Ok(())
241    }
242
243    pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
244        let flow = {
245            let mut flows = self.flows.lock().unwrap();
246            let Some(flow) = flows.remove(flow_name) else {
247                return Err(AgentError::FlowNotFound(flow_name.to_string()));
248            };
249            flow.clone()
250        };
251
252        flow.stop(self).await?;
253
254        // Remove all nodes and edges associated with the flow
255        for node in flow.nodes() {
256            self.remove_agent(&node.id).await?;
257        }
258        for edge in flow.edges() {
259            self.remove_edge(edge);
260        }
261
262        Ok(())
263    }
264
265    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266        let flow_name = flow.name();
267
268        let mut flows = self.flows.lock().unwrap();
269        flows.insert(flow_name.to_string(), flow);
270        Ok(())
271    }
272
273    pub fn add_agent_flow_node(
274        &self,
275        flow_name: &str,
276        node: &AgentFlowNode,
277    ) -> Result<(), AgentError> {
278        let mut flows = self.flows.lock().unwrap();
279        let Some(flow) = flows.get_mut(flow_name) else {
280            return Err(AgentError::FlowNotFound(flow_name.to_string()));
281        };
282        flow.add_node(node.clone());
283        self.add_agent(flow_name, node)?;
284        Ok(())
285    }
286
287    pub(crate) fn add_agent(
288        &self,
289        flow_name: &str,
290        node: &AgentFlowNode,
291    ) -> Result<(), AgentError> {
292        let mut agents = self.agents.lock().unwrap();
293        if agents.contains_key(&node.id) {
294            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
295        }
296        if let Ok(mut agent) = agent_new(
297            self.clone(),
298            node.id.clone(),
299            &node.def_name,
300            node.config.clone(),
301        ) {
302            agent.set_flow_name(flow_name.to_string());
303            agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
304        } else {
305            return Err(AgentError::AgentCreationFailed(node.id.to_string()));
306        }
307        Ok(())
308    }
309
310    pub fn add_agent_flow_edge(
311        &self,
312        flow_name: &str,
313        edge: &AgentFlowEdge,
314    ) -> Result<(), AgentError> {
315        let mut flows = self.flows.lock().unwrap();
316        let Some(flow) = flows.get_mut(flow_name) else {
317            return Err(AgentError::FlowNotFound(flow_name.to_string()));
318        };
319        flow.add_edge(edge.clone());
320        self.add_edge(edge)?;
321        Ok(())
322    }
323
324    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
325        // check if the source agent exists
326        {
327            let agents = self.agents.lock().unwrap();
328            if !agents.contains_key(&edge.source) {
329                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
330            }
331        }
332
333        // check if handles are valid
334        if edge.source_handle.is_empty() {
335            return Err(AgentError::EmptySourceHandle);
336        }
337        if edge.target_handle.is_empty() {
338            return Err(AgentError::EmptyTargetHandle);
339        }
340
341        let mut edges = self.edges.lock().unwrap();
342        if let Some(targets) = edges.get_mut(&edge.source) {
343            if targets
344                .iter()
345                .any(|(target, source_handle, target_handle)| {
346                    *target == edge.target
347                        && *source_handle == edge.source_handle
348                        && *target_handle == edge.target_handle
349                })
350            {
351                return Err(AgentError::EdgeAlreadyExists);
352            }
353            targets.push((
354                edge.target.clone(),
355                edge.source_handle.clone(),
356                edge.target_handle.clone(),
357            ));
358        } else {
359            edges.insert(
360                edge.source.clone(),
361                vec![(
362                    edge.target.clone(),
363                    edge.source_handle.clone(),
364                    edge.target_handle.clone(),
365                )],
366            );
367        }
368        Ok(())
369    }
370
371    pub async fn remove_agent_flow_node(
372        &self,
373        flow_name: &str,
374        node_id: &str,
375    ) -> Result<(), AgentError> {
376        {
377            let mut flows = self.flows.lock().unwrap();
378            let Some(flow) = flows.get_mut(flow_name) else {
379                return Err(AgentError::FlowNotFound(flow_name.to_string()));
380            };
381            flow.remove_node(node_id);
382        }
383        self.remove_agent(node_id).await?;
384        Ok(())
385    }
386
387    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
388        self.stop_agent(agent_id).await?;
389
390        // remove from edges
391        {
392            let mut edges = self.edges.lock().unwrap();
393            let mut sources_to_remove = Vec::new();
394            for (source, targets) in edges.iter_mut() {
395                targets.retain(|(target, _, _)| target != agent_id);
396                if targets.is_empty() {
397                    sources_to_remove.push(source.clone());
398                }
399            }
400            for source in sources_to_remove {
401                edges.remove(&source);
402            }
403            edges.remove(agent_id);
404        }
405
406        // remove from agents
407        {
408            let mut agents = self.agents.lock().unwrap();
409            agents.remove(agent_id);
410        }
411
412        Ok(())
413    }
414
415    pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
416        let mut flows = self.flows.lock().unwrap();
417        let Some(flow) = flows.get_mut(flow_name) else {
418            return Err(AgentError::FlowNotFound(flow_name.to_string()));
419        };
420        let Some(edge) = flow.remove_edge(edge_id) else {
421            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
422        };
423        self.remove_edge(&edge);
424        Ok(())
425    }
426
427    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
428        let mut edges = self.edges.lock().unwrap();
429        if let Some(targets) = edges.get_mut(&edge.source) {
430            targets.retain(|(target, source_handle, target_handle)| {
431                *target != edge.target
432                    || *source_handle != edge.source_handle
433                    || *target_handle != edge.target_handle
434            });
435            if targets.is_empty() {
436                edges.remove(&edge.source);
437            }
438        }
439    }
440
441    pub fn copy_sub_flow(
442        &self,
443        nodes: &Vec<AgentFlowNode>,
444        edges: &Vec<AgentFlowEdge>,
445    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
446        flow::copy_sub_flow(nodes, edges)
447    }
448
449    pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
450        let flows = self.flows.lock().unwrap();
451        let Some(flow) = flows.get(name) else {
452            return Err(AgentError::FlowNotFound(name.to_string()));
453        };
454        flow.start(self).await?;
455        Ok(())
456    }
457
458    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
459        let agent = {
460            let agents = self.agents.lock().unwrap();
461            let Some(a) = agents.get(agent_id) else {
462                return Err(AgentError::AgentNotFound(agent_id.to_string()));
463            };
464            a.clone()
465        };
466        let def_name = {
467            let agent = agent.lock().await;
468            agent.def_name().to_string()
469        };
470        let uses_native_thread = {
471            let defs = self.defs.lock().unwrap();
472            let Some(def) = defs.get(&def_name) else {
473                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
474            };
475            def.native_thread
476        };
477        let agent_status = {
478            let agent = agent.lock().await;
479            agent.status().clone()
480        };
481        if agent_status == AgentStatus::Init {
482            log::info!("Starting agent {}", agent_id);
483
484            if uses_native_thread {
485                let (tx, rx) = std::sync::mpsc::channel();
486
487                {
488                    let mut agent_txs = self.agent_txs.lock().unwrap();
489                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
490                };
491
492                let agent_id = agent_id.to_string();
493                std::thread::spawn(async move || {
494                    if let Err(e) = agent.lock().await.start() {
495                        log::error!("Failed to start agent {}: {}", agent_id, e);
496                    }
497
498                    while let Ok(message) = rx.recv() {
499                        match message {
500                            AgentMessage::Input { ctx, data } => {
501                                agent
502                                    .lock()
503                                    .await
504                                    .process(ctx, data)
505                                    .await
506                                    .unwrap_or_else(|e| {
507                                        log::error!("Process Error {}: {}", agent_id, e);
508                                    });
509                            }
510                            AgentMessage::Config { config } => {
511                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
512                                    log::error!("Config Error {}: {}", agent_id, e);
513                                });
514                            }
515                            AgentMessage::Stop => {
516                                break;
517                            }
518                        }
519                    }
520                });
521            } else {
522                let (tx, mut rx) = mpsc::channel(32);
523
524                {
525                    let mut agent_txs = self.agent_txs.lock().unwrap();
526                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
527                };
528
529                let agent_id = agent_id.to_string();
530                tokio::spawn(async move {
531                    {
532                        let mut agent_guard = agent.lock().await;
533                        if let Err(e) = agent_guard.start() {
534                            log::error!("Failed to start agent {}: {}", agent_id, e);
535                        }
536                    }
537
538                    while let Some(message) = rx.recv().await {
539                        match message {
540                            AgentMessage::Input { ctx, data } => {
541                                agent
542                                    .lock()
543                                    .await
544                                    .process(ctx, data)
545                                    .await
546                                    .unwrap_or_else(|e| {
547                                        log::error!("Process Error {}: {}", agent_id, e);
548                                    });
549                            }
550                            AgentMessage::Config { config } => {
551                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
552                                    log::error!("Config Error {}: {}", agent_id, e);
553                                });
554                            }
555                            AgentMessage::Stop => {
556                                rx.close();
557                                return;
558                            }
559                        }
560                    }
561                });
562            }
563        }
564        Ok(())
565    }
566
567    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
568        let agent = {
569            let agents = self.agents.lock().unwrap();
570            let Some(a) = agents.get(agent_id) else {
571                return Err(AgentError::AgentNotFound(agent_id.to_string()));
572            };
573            a.clone()
574        };
575
576        let agent_status = {
577            let agent = agent.lock().await;
578            agent.status().clone()
579        };
580        if agent_status == AgentStatus::Start {
581            log::info!("Stopping agent {}", agent_id);
582
583            {
584                let mut agent_txs = self.agent_txs.lock().unwrap();
585                if let Some(tx) = agent_txs.remove(agent_id) {
586                    match tx {
587                        AgentMessageSender::Sync(tx) => {
588                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
589                                log::error!(
590                                    "Failed to send stop message to agent {}: {}",
591                                    agent_id,
592                                    e
593                                );
594                            });
595                        }
596                        AgentMessageSender::Async(tx) => {
597                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
598                                log::error!(
599                                    "Failed to send stop message to agent {}: {}",
600                                    agent_id,
601                                    e
602                                );
603                            });
604                        }
605                    }
606                }
607            }
608
609            agent.lock().await.stop()?;
610        }
611
612        Ok(())
613    }
614
615    pub async fn set_agent_config(
616        &self,
617        agent_id: String,
618        config: AgentConfig,
619    ) -> Result<(), AgentError> {
620        let agent = {
621            let agents = self.agents.lock().unwrap();
622            let Some(a) = agents.get(&agent_id) else {
623                return Err(AgentError::AgentNotFound(agent_id.to_string()));
624            };
625            a.clone()
626        };
627
628        let agent_status = {
629            let agent = agent.lock().await;
630            agent.status().clone()
631        };
632        if agent_status == AgentStatus::Init {
633            agent.lock().await.set_config(config.clone())?;
634        } else if agent_status == AgentStatus::Start {
635            let tx = {
636                let agent_txs = self.agent_txs.lock().unwrap();
637                let Some(tx) = agent_txs.get(&agent_id) else {
638                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
639                };
640                tx.clone()
641            };
642            let message = AgentMessage::Config { config };
643            match tx {
644                AgentMessageSender::Sync(tx) => {
645                    tx.send(message).map_err(|_| {
646                        AgentError::SendMessageFailed("Failed to send config message".to_string())
647                    })?;
648                }
649                AgentMessageSender::Async(tx) => {
650                    tx.send(message).await.map_err(|_| {
651                        AgentError::SendMessageFailed("Failed to send config message".to_string())
652                    })?;
653                }
654            }
655        }
656        Ok(())
657    }
658
659    pub fn get_global_config(&self, def_name: &str) -> Option<AgentConfig> {
660        let global_configs = self.global_configs.lock().unwrap();
661        global_configs.get(def_name).cloned()
662    }
663
664    pub fn set_global_config(&self, def_name: String, config: AgentConfig) {
665        let mut global_configs = self.global_configs.lock().unwrap();
666
667        let Some(existing_config) = global_configs.get_mut(&def_name) else {
668            global_configs.insert(def_name, config);
669            return;
670        };
671
672        for (key, value) in config {
673            existing_config.set(key, value);
674        }
675    }
676
677    pub fn set_global_configs(&self, new_configs: AgentConfigs) {
678        for (agent_name, new_config) in new_configs {
679            self.set_global_config(agent_name, new_config);
680        }
681    }
682
683    pub fn get_global_configs(&self) -> AgentConfigs {
684        let global_configs = self.global_configs.lock().unwrap();
685        global_configs.clone()
686    }
687
688    pub async fn agent_input(
689        &self,
690        agent_id: String,
691        ctx: AgentContext,
692        data: AgentData,
693    ) -> Result<(), AgentError> {
694        let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
695            let agents = self.agents.lock().unwrap();
696            let Some(a) = agents.get(&agent_id) else {
697                return Err(AgentError::AgentNotFound(agent_id.to_string()));
698            };
699            a.clone()
700        };
701
702        let agent_status = {
703            let agent = agent.lock().await;
704            agent.status().clone()
705        };
706        if agent_status == AgentStatus::Start {
707            let ch = ctx.port().to_string();
708            let message = AgentMessage::Input { ctx, data };
709
710            let tx = {
711                let agent_txs = self.agent_txs.lock().unwrap();
712                let Some(tx) = agent_txs.get(&agent_id) else {
713                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
714                };
715                tx.clone()
716            };
717            match tx {
718                AgentMessageSender::Sync(tx) => {
719                    tx.send(message).map_err(|_| {
720                        AgentError::SendMessageFailed("Failed to send input message".to_string())
721                    })?;
722                }
723                AgentMessageSender::Async(tx) => {
724                    tx.send(message).await.map_err(|_| {
725                        AgentError::SendMessageFailed("Failed to send input message".to_string())
726                    })?;
727                }
728            }
729
730            self.emit_input(agent_id.to_string(), ch);
731        }
732        Ok(())
733    }
734
735    pub async fn send_agent_out(
736        &self,
737        agent_id: String,
738        ctx: AgentContext,
739        data: AgentData,
740    ) -> Result<(), AgentError> {
741        message::send_agent_out(self, agent_id, ctx, data).await
742    }
743
744    pub fn try_send_agent_out(
745        &self,
746        agent_id: String,
747        ctx: AgentContext,
748        data: AgentData,
749    ) -> Result<(), AgentError> {
750        message::try_send_agent_out(self, agent_id, ctx, data)
751    }
752
753    pub fn try_send_board_out(
754        &self,
755        name: String,
756        ctx: AgentContext,
757        data: AgentData,
758    ) -> Result<(), AgentError> {
759        message::try_send_board_out(self, name, ctx, data)
760    }
761
762    fn spawn_message_loop(&self) -> Result<(), AgentError> {
763        // TODO: settings for the channel size
764        let (tx, mut rx) = mpsc::channel(4096);
765        {
766            let mut tx_lock = self.tx.lock().unwrap();
767            *tx_lock = Some(tx);
768        }
769
770        // spawn the main loop
771        let askit = self.clone();
772        tokio::spawn(async move {
773            while let Some(message) = rx.recv().await {
774                use AgentEventMessage::*;
775
776                match message {
777                    AgentOut { agent, ctx, data } => {
778                        message::agent_out(&askit, agent, ctx, data).await;
779                    }
780                    BoardOut { name, ctx, data } => {
781                        message::board_out(&askit, name, ctx, data).await;
782                    }
783                }
784            }
785        });
786
787        Ok(())
788    }
789
790    async fn start_agent_flows(&self) -> Result<(), AgentError> {
791        let agent_flow_names;
792        {
793            let agent_flows = self.flows.lock().unwrap();
794            agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
795        }
796        for name in agent_flow_names {
797            self.start_agent_flow(&name).await.unwrap_or_else(|e| {
798                log::error!("Failed to start agent flow: {}", e);
799            });
800        }
801        Ok(())
802    }
803
804    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
805        let mut observers = self.observers.lock().unwrap();
806        let observer_id = new_observer_id();
807        observers.insert(observer_id, observer);
808        observer_id
809    }
810
811    pub fn unsubscribe(&self, observer_id: usize) {
812        let mut observers = self.observers.lock().unwrap();
813        observers.remove(&observer_id);
814    }
815
816    pub(crate) fn emit_error(&self, agent_id: String, message: String) {
817        self.notify_observers(ASKitEvent::AgentError(agent_id.clone(), message.clone()));
818    }
819
820    pub(crate) fn emit_input(&self, agent_id: String, ch: String) {
821        self.notify_observers(ASKitEvent::AgentIn(agent_id.clone(), ch.clone()));
822    }
823
824    pub(crate) fn emit_display(&self, agent_id: String, key: String, data: AgentData) {
825        self.notify_observers(ASKitEvent::AgentDisplay(
826            agent_id.clone(),
827            key.clone(),
828            data.clone(),
829        ));
830    }
831
832    fn notify_observers(&self, event: ASKitEvent) {
833        let observers = self.observers.lock().unwrap();
834        for (_id, observer) in observers.iter() {
835            observer.notify(event.clone());
836        }
837    }
838}
839
840#[derive(Clone, Debug)]
841pub enum ASKitEvent {
842    AgentIn(String, String),                 // (agent_id, channel)
843    AgentDisplay(String, String, AgentData), // (agent_id, key, data)
844    AgentError(String, String),              // (agent_id, message)
845}
846
847pub trait ASKitObserver {
848    fn notify(&self, event: ASKitEvent);
849}
850
851static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
852
853fn new_observer_id() -> usize {
854    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
855}
856
857// Agent Message
858
859#[derive(Clone)]
860pub enum AgentMessageSender {
861    Sync(std::sync::mpsc::Sender<AgentMessage>),
862    Async(mpsc::Sender<AgentMessage>),
863}