agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::board_agent;
8
9use super::agent::{Agent, AgentMessage, AgentStatus, agent_new};
10use super::config::AgentConfig;
11use super::context::AgentContext;
12use super::data::AgentData;
13use super::definition::{AgentDefaultConfig, AgentDefinition, AgentDefinitions};
14use super::error::AgentError;
15use super::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
16use super::message::{self, AgentEventMessage};
17
18#[derive(Clone)]
19pub struct ASKit {
20    // agent id -> agent
21    pub(crate) agents:
22        Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
23
24    // agent id -> sender
25    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
26
27    // board name -> [board out agent id]
28    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
29
30    // board name -> data
31    pub(crate) board_data: Arc<Mutex<HashMap<String, AgentData>>>,
32
33    // sourece agent id -> [target agent id / source handle / target handle]
34    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
35
36    // agent def name -> agent definition
37    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39    // agent flows
40    pub(crate) flows: Arc<Mutex<AgentFlows>>,
41
42    // agent def name -> config
43    pub(crate) global_configs: Arc<Mutex<HashMap<String, AgentConfig>>>,
44
45    // message sender
46    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48    // observers
49    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53    pub fn new() -> Self {
54        Self {
55            agents: Default::default(),
56            agent_txs: Default::default(),
57            board_out_agents: Default::default(),
58            board_data: Default::default(),
59            edges: Default::default(),
60            defs: Default::default(),
61            flows: Default::default(),
62            global_configs: Default::default(),
63            tx: Arc::new(Mutex::new(None)),
64            observers: Default::default(),
65        }
66    }
67
68    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69        self.tx
70            .lock()
71            .unwrap()
72            .clone()
73            .ok_or(AgentError::TxNotInitialized)
74    }
75
76    pub fn init() -> Result<Self, AgentError> {
77        let askit = Self::new();
78        askit.register_agents();
79        Ok(askit)
80    }
81
82    fn register_agents(&self) {
83        board_agent::register_agents(self);
84    }
85
86    pub async fn ready(&self) -> Result<(), AgentError> {
87        self.spawn_message_loop()?;
88        self.start_agent_flows().await?;
89        Ok(())
90    }
91
92    pub fn quit(&self) {
93        let mut tx_lock = self.tx.lock().unwrap();
94        *tx_lock = None;
95    }
96
97    pub fn register_agent(&self, def: AgentDefinition) {
98        let mut defs = self.defs.lock().unwrap();
99        defs.insert(def.name.clone(), def);
100    }
101
102    pub fn get_agent_definitions(&self) -> AgentDefinitions {
103        let defs = self.defs.lock().unwrap();
104        defs.clone()
105    }
106
107    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
108        let defs = self.defs.lock().unwrap();
109        defs.get(def_name).cloned()
110    }
111
112    pub fn get_agent_default_config(&self, def_name: &str) -> Option<AgentDefaultConfig> {
113        let defs = self.defs.lock().unwrap();
114        let Some(def) = defs.get(def_name) else {
115            return None;
116        };
117        def.default_config.clone()
118    }
119
120    // // flow
121
122    pub fn get_agent_flows(&self) -> AgentFlows {
123        let flows = self.flows.lock().unwrap();
124        flows.clone()
125    }
126
127    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
128        if !Self::is_valid_flow_name(name) {
129            return Err(AgentError::InvalidFlowName(name.into()));
130        }
131
132        let new_name = self.unique_flow_name(name);
133        let mut flows = self.flows.lock().unwrap();
134        let flow = AgentFlow::new(new_name.clone());
135        flows.insert(new_name, flow.clone());
136        Ok(flow)
137    }
138
139    pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
140        if !Self::is_valid_flow_name(new_name) {
141            return Err(AgentError::InvalidFlowName(new_name.into()));
142        }
143
144        // check if the new name is already used
145        let new_name = self.unique_flow_name(new_name);
146
147        let mut flows = self.flows.lock().unwrap();
148
149        // remove the original flow
150        let Some(mut flow) = flows.remove(old_name) else {
151            return Err(AgentError::RenameFlowFailed(old_name.into()));
152        };
153
154        // insert renamed flow
155        flow.set_name(new_name.clone());
156        flows.insert(new_name.clone(), flow);
157        Ok(new_name)
158    }
159
160    fn is_valid_flow_name(new_name: &str) -> bool {
161        // Check if the name is empty
162        if new_name.trim().is_empty() {
163            return false;
164        }
165
166        // Checks for path-like names:
167        if new_name.contains('/') {
168            // Disallow leading, trailing, or consecutive slashes
169            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
170                return false;
171            }
172            // Disallow segments that are "." or ".."
173            if new_name
174                .split('/')
175                .any(|segment| segment == "." || segment == "..")
176            {
177                return false;
178            }
179        }
180
181        // Check if the name contains invalid characters
182        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
183        for c in invalid_chars {
184            if new_name.contains(c) {
185                return false;
186            }
187        }
188
189        true
190    }
191
192    pub fn unique_flow_name(&self, name: &str) -> String {
193        let mut new_name = name.trim().to_string();
194        let mut i = 2;
195        let flows = self.flows.lock().unwrap();
196        while flows.contains_key(&new_name) {
197            new_name = format!("{}{}", name, i);
198            i += 1;
199        }
200        new_name
201    }
202
203    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
204        let name = agent_flow.name();
205
206        // add the given flow into flows
207        {
208            let mut flows = self.flows.lock().unwrap();
209            if flows.contains_key(name) {
210                return Err(AgentError::DuplicateFlowName(name.into()));
211            }
212            flows.insert(name.into(), agent_flow.clone());
213        }
214
215        // add nodes into agents
216        for node in agent_flow.nodes().iter() {
217            self.add_agent(name, node).unwrap_or_else(|e| {
218                log::error!("Failed to add_agent_node {}: {}", node.id, e);
219            });
220        }
221
222        // add edges into edges
223        for edge in agent_flow.edges().iter() {
224            self.add_edge(edge).unwrap_or_else(|e| {
225                log::error!("Failed to add_edge {}: {}", edge.source, e);
226            });
227        }
228
229        Ok(())
230    }
231
232    pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
233        let flow = {
234            let mut flows = self.flows.lock().unwrap();
235            let Some(flow) = flows.remove(flow_name) else {
236                return Err(AgentError::FlowNotFound(flow_name.to_string()));
237            };
238            flow.clone()
239        };
240
241        flow.stop(self).await?;
242
243        // Remove all nodes and edges associated with the flow
244        for node in flow.nodes() {
245            self.remove_agent(&node.id).await?;
246        }
247        for edge in flow.edges() {
248            self.remove_edge(edge);
249        }
250
251        Ok(())
252    }
253
254    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
255        let flow_name = flow.name();
256
257        let mut flows = self.flows.lock().unwrap();
258        flows.insert(flow_name.to_string(), flow);
259        Ok(())
260    }
261
262    pub fn add_agent_flow_node(
263        &self,
264        flow_name: &str,
265        node: &AgentFlowNode,
266    ) -> Result<(), AgentError> {
267        let mut flows = self.flows.lock().unwrap();
268        let Some(flow) = flows.get_mut(flow_name) else {
269            return Err(AgentError::FlowNotFound(flow_name.to_string()));
270        };
271        flow.add_node(node.clone());
272        self.add_agent(flow_name, node)?;
273        Ok(())
274    }
275
276    pub(crate) fn add_agent(
277        &self,
278        flow_name: &str,
279        node: &AgentFlowNode,
280    ) -> Result<(), AgentError> {
281        let mut agents = self.agents.lock().unwrap();
282        if agents.contains_key(&node.id) {
283            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
284        }
285        if let Ok(mut agent) = agent_new(
286            self.clone(),
287            node.id.clone(),
288            &node.def_name,
289            node.config.clone(),
290        ) {
291            agent.set_flow_name(flow_name.to_string());
292            agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
293        } else {
294            return Err(AgentError::AgentCreationFailed(node.id.to_string()));
295        }
296        Ok(())
297    }
298
299    pub fn add_agent_flow_edge(
300        &self,
301        flow_name: &str,
302        edge: &AgentFlowEdge,
303    ) -> Result<(), AgentError> {
304        let mut flows = self.flows.lock().unwrap();
305        let Some(flow) = flows.get_mut(flow_name) else {
306            return Err(AgentError::FlowNotFound(flow_name.to_string()));
307        };
308        flow.add_edge(edge.clone());
309        self.add_edge(edge)?;
310        Ok(())
311    }
312
313    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
314        // check if the source agent exists
315        {
316            let agents = self.agents.lock().unwrap();
317            if !agents.contains_key(&edge.source) {
318                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
319            }
320        }
321
322        // check if handles are valid
323        if edge.source_handle.is_empty() {
324            return Err(AgentError::EmptySourceHandle);
325        }
326        if edge.target_handle.is_empty() {
327            return Err(AgentError::EmptyTargetHandle);
328        }
329
330        let mut edges = self.edges.lock().unwrap();
331        if let Some(targets) = edges.get_mut(&edge.source) {
332            if targets
333                .iter()
334                .any(|(target, source_handle, target_handle)| {
335                    *target == edge.target
336                        && *source_handle == edge.source_handle
337                        && *target_handle == edge.target_handle
338                })
339            {
340                return Err(AgentError::EdgeAlreadyExists);
341            }
342            targets.push((
343                edge.target.clone(),
344                edge.source_handle.clone(),
345                edge.target_handle.clone(),
346            ));
347        } else {
348            edges.insert(
349                edge.source.clone(),
350                vec![(
351                    edge.target.clone(),
352                    edge.source_handle.clone(),
353                    edge.target_handle.clone(),
354                )],
355            );
356        }
357        Ok(())
358    }
359
360    pub async fn remove_agent_flow_node(
361        &self,
362        flow_name: &str,
363        node_id: &str,
364    ) -> Result<(), AgentError> {
365        {
366            let mut flows = self.flows.lock().unwrap();
367            let Some(flow) = flows.get_mut(flow_name) else {
368                return Err(AgentError::FlowNotFound(flow_name.to_string()));
369            };
370            flow.remove_node(node_id);
371        }
372        self.remove_agent(node_id).await?;
373        Ok(())
374    }
375
376    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
377        self.stop_agent(agent_id).await?;
378
379        // remove from edges
380        {
381            let mut edges = self.edges.lock().unwrap();
382            let mut sources_to_remove = Vec::new();
383            for (source, targets) in edges.iter_mut() {
384                targets.retain(|(target, _, _)| target != agent_id);
385                if targets.is_empty() {
386                    sources_to_remove.push(source.clone());
387                }
388            }
389            for source in sources_to_remove {
390                edges.remove(&source);
391            }
392            edges.remove(agent_id);
393        }
394
395        // remove from agents
396        {
397            let mut agents = self.agents.lock().unwrap();
398            agents.remove(agent_id);
399        }
400
401        Ok(())
402    }
403
404    pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
405        let mut flows = self.flows.lock().unwrap();
406        let Some(flow) = flows.get_mut(flow_name) else {
407            return Err(AgentError::FlowNotFound(flow_name.to_string()));
408        };
409        let Some(edge) = flow.remove_edge(edge_id) else {
410            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
411        };
412        self.remove_edge(&edge);
413        Ok(())
414    }
415
416    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
417        let mut edges = self.edges.lock().unwrap();
418        if let Some(targets) = edges.get_mut(&edge.source) {
419            targets.retain(|(target, source_handle, target_handle)| {
420                *target != edge.target
421                    || *source_handle != edge.source_handle
422                    || *target_handle != edge.target_handle
423            });
424            if targets.is_empty() {
425                edges.remove(&edge.source);
426            }
427        }
428    }
429
430    pub fn copy_sub_flow(
431        &self,
432        nodes: &Vec<AgentFlowNode>,
433        edges: &Vec<AgentFlowEdge>,
434    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
435        flow::copy_sub_flow(nodes, edges)
436    }
437
438    pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
439        let flows = self.flows.lock().unwrap();
440        let Some(flow) = flows.get(name) else {
441            return Err(AgentError::FlowNotFound(name.to_string()));
442        };
443        flow.start(self).await?;
444        Ok(())
445    }
446
447    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
448        let agent = {
449            let agents = self.agents.lock().unwrap();
450            let Some(a) = agents.get(agent_id) else {
451                return Err(AgentError::AgentNotFound(agent_id.to_string()));
452            };
453            a.clone()
454        };
455        let def_name = {
456            let agent = agent.lock().await;
457            agent.def_name().to_string()
458        };
459        let uses_native_thread = {
460            let defs = self.defs.lock().unwrap();
461            let Some(def) = defs.get(&def_name) else {
462                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
463            };
464            def.native_thread
465        };
466        let agent_status = {
467            let agent = agent.lock().await;
468            agent.status().clone()
469        };
470        if agent_status == AgentStatus::Init {
471            log::info!("Starting agent {}", agent_id);
472
473            if uses_native_thread {
474                let (tx, rx) = std::sync::mpsc::channel();
475
476                {
477                    let mut agent_txs = self.agent_txs.lock().unwrap();
478                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
479                };
480
481                let agent_id = agent_id.to_string();
482                std::thread::spawn(async move || {
483                    if let Err(e) = agent.lock().await.start() {
484                        log::error!("Failed to start agent {}: {}", agent_id, e);
485                    }
486
487                    while let Ok(message) = rx.recv() {
488                        match message {
489                            AgentMessage::Input { ctx, data } => {
490                                agent
491                                    .lock()
492                                    .await
493                                    .process(ctx, data)
494                                    .await
495                                    .unwrap_or_else(|e| {
496                                        log::error!("Process Error {}: {}", agent_id, e);
497                                    });
498                            }
499                            AgentMessage::Config { config } => {
500                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
501                                    log::error!("Config Error {}: {}", agent_id, e);
502                                });
503                            }
504                            AgentMessage::Stop => {
505                                break;
506                            }
507                        }
508                    }
509                });
510            } else {
511                let (tx, mut rx) = mpsc::channel(32);
512
513                {
514                    let mut agent_txs = self.agent_txs.lock().unwrap();
515                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
516                };
517
518                let agent_id = agent_id.to_string();
519                tokio::spawn(async move {
520                    {
521                        let mut agent_guard = agent.lock().await;
522                        if let Err(e) = agent_guard.start() {
523                            log::error!("Failed to start agent {}: {}", agent_id, e);
524                        }
525                    }
526
527                    while let Some(message) = rx.recv().await {
528                        match message {
529                            AgentMessage::Input { ctx, data } => {
530                                agent
531                                    .lock()
532                                    .await
533                                    .process(ctx, data)
534                                    .await
535                                    .unwrap_or_else(|e| {
536                                        log::error!("Process Error {}: {}", agent_id, e);
537                                    });
538                            }
539                            AgentMessage::Config { config } => {
540                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
541                                    log::error!("Config Error {}: {}", agent_id, e);
542                                });
543                            }
544                            AgentMessage::Stop => {
545                                rx.close();
546                                return;
547                            }
548                        }
549                    }
550                });
551            }
552        }
553        Ok(())
554    }
555
556    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
557        let agent = {
558            let agents = self.agents.lock().unwrap();
559            let Some(a) = agents.get(agent_id) else {
560                return Err(AgentError::AgentNotFound(agent_id.to_string()));
561            };
562            a.clone()
563        };
564
565        let agent_status = {
566            let agent = agent.lock().await;
567            agent.status().clone()
568        };
569        if agent_status == AgentStatus::Start {
570            log::info!("Stopping agent {}", agent_id);
571
572            {
573                let mut agent_txs = self.agent_txs.lock().unwrap();
574                if let Some(tx) = agent_txs.remove(agent_id) {
575                    match tx {
576                        AgentMessageSender::Sync(tx) => {
577                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
578                                log::error!(
579                                    "Failed to send stop message to agent {}: {}",
580                                    agent_id,
581                                    e
582                                );
583                            });
584                        }
585                        AgentMessageSender::Async(tx) => {
586                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
587                                log::error!(
588                                    "Failed to send stop message to agent {}: {}",
589                                    agent_id,
590                                    e
591                                );
592                            });
593                        }
594                    }
595                }
596            }
597
598            agent.lock().await.stop()?;
599        }
600
601        Ok(())
602    }
603
604    pub async fn set_agent_config(
605        &self,
606        agent_id: String,
607        config: AgentConfig,
608    ) -> Result<(), AgentError> {
609        let agent = {
610            let agents = self.agents.lock().unwrap();
611            let Some(a) = agents.get(&agent_id) else {
612                return Err(AgentError::AgentNotFound(agent_id.to_string()));
613            };
614            a.clone()
615        };
616
617        let agent_status = {
618            let agent = agent.lock().await;
619            agent.status().clone()
620        };
621        if agent_status == AgentStatus::Init {
622            agent.lock().await.set_config(config.clone())?;
623        } else if agent_status == AgentStatus::Start {
624            let tx = {
625                let agent_txs = self.agent_txs.lock().unwrap();
626                let Some(tx) = agent_txs.get(&agent_id) else {
627                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
628                };
629                tx.clone()
630            };
631            let message = AgentMessage::Config { config };
632            match tx {
633                AgentMessageSender::Sync(tx) => {
634                    tx.send(message).map_err(|_| {
635                        AgentError::SendMessageFailed("Failed to send config message".to_string())
636                    })?;
637                }
638                AgentMessageSender::Async(tx) => {
639                    tx.send(message).await.map_err(|_| {
640                        AgentError::SendMessageFailed("Failed to send config message".to_string())
641                    })?;
642                }
643            }
644        }
645        Ok(())
646    }
647
648    pub fn get_global_config(&self, def_name: &str) -> Option<AgentConfig> {
649        let global_configs = self.global_configs.lock().unwrap();
650        global_configs.get(def_name).cloned()
651    }
652
653    pub fn set_global_config(&self, def_name: &str, config: AgentConfig) {
654        let mut global_configs = self.global_configs.lock().unwrap();
655        global_configs.insert(def_name.to_string(), config);
656    }
657
658    pub async fn agent_input(
659        &self,
660        agent_id: String,
661        ctx: AgentContext,
662        data: AgentData,
663    ) -> Result<(), AgentError> {
664        let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
665            let agents = self.agents.lock().unwrap();
666            let Some(a) = agents.get(&agent_id) else {
667                return Err(AgentError::AgentNotFound(agent_id.to_string()));
668            };
669            a.clone()
670        };
671
672        let agent_status = {
673            let agent = agent.lock().await;
674            agent.status().clone()
675        };
676        if agent_status == AgentStatus::Start {
677            let ch = ctx.port().to_string();
678            let message = AgentMessage::Input { ctx, data };
679
680            let tx = {
681                let agent_txs = self.agent_txs.lock().unwrap();
682                let Some(tx) = agent_txs.get(&agent_id) else {
683                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
684                };
685                tx.clone()
686            };
687            match tx {
688                AgentMessageSender::Sync(tx) => {
689                    tx.send(message).map_err(|_| {
690                        AgentError::SendMessageFailed("Failed to send input message".to_string())
691                    })?;
692                }
693                AgentMessageSender::Async(tx) => {
694                    tx.send(message).await.map_err(|_| {
695                        AgentError::SendMessageFailed("Failed to send input message".to_string())
696                    })?;
697                }
698            }
699
700            self.emit_input(agent_id.to_string(), ch);
701        }
702        Ok(())
703    }
704
705    pub async fn send_agent_out(
706        &self,
707        agent_id: String,
708        ctx: AgentContext,
709        data: AgentData,
710    ) -> Result<(), AgentError> {
711        message::send_agent_out(self, agent_id, ctx, data).await
712    }
713
714    pub fn try_send_agent_out(
715        &self,
716        agent_id: String,
717        ctx: AgentContext,
718        data: AgentData,
719    ) -> Result<(), AgentError> {
720        message::try_send_agent_out(self, agent_id, ctx, data)
721    }
722
723    pub fn try_send_board_out(
724        &self,
725        name: String,
726        ctx: AgentContext,
727        data: AgentData,
728    ) -> Result<(), AgentError> {
729        message::try_send_board_out(self, name, ctx, data)
730    }
731
732    fn spawn_message_loop(&self) -> Result<(), AgentError> {
733        // TODO: settings for the channel size
734        let (tx, mut rx) = mpsc::channel(4096);
735        {
736            let mut tx_lock = self.tx.lock().unwrap();
737            *tx_lock = Some(tx);
738        }
739
740        // spawn the main loop
741        let askit = self.clone();
742        tokio::spawn(async move {
743            while let Some(message) = rx.recv().await {
744                use AgentEventMessage::*;
745
746                match message {
747                    AgentOut { agent, ctx, data } => {
748                        message::agent_out(&askit, agent, ctx, data).await;
749                    }
750                    BoardOut { name, ctx, data } => {
751                        message::board_out(&askit, name, ctx, data).await;
752                    }
753                }
754            }
755        });
756
757        Ok(())
758    }
759
760    async fn start_agent_flows(&self) -> Result<(), AgentError> {
761        let agent_flow_names;
762        {
763            let agent_flows = self.flows.lock().unwrap();
764            agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
765        }
766        for name in agent_flow_names {
767            self.start_agent_flow(&name).await.unwrap_or_else(|e| {
768                log::error!("Failed to start agent flow: {}", e);
769            });
770        }
771        Ok(())
772    }
773
774    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
775        let mut observers = self.observers.lock().unwrap();
776        let observer_id = new_observer_id();
777        observers.insert(observer_id, observer);
778        observer_id
779    }
780
781    pub fn unsubscribe(&self, observer_id: usize) {
782        let mut observers = self.observers.lock().unwrap();
783        observers.remove(&observer_id);
784    }
785
786    pub(crate) fn emit_error(&self, agent_id: String, message: String) {
787        self.notify_observers(ASKitEvent::AgentError(agent_id.clone(), message.clone()));
788    }
789
790    pub(crate) fn emit_input(&self, agent_id: String, ch: String) {
791        self.notify_observers(ASKitEvent::AgentIn(agent_id.clone(), ch.clone()));
792    }
793
794    pub(crate) fn emit_display(&self, agent_id: String, key: String, data: AgentData) {
795        self.notify_observers(ASKitEvent::AgentDisplay(
796            agent_id.clone(),
797            key.clone(),
798            data.clone(),
799        ));
800    }
801
802    fn notify_observers(&self, event: ASKitEvent) {
803        let observers = self.observers.lock().unwrap();
804        for (_id, observer) in observers.iter() {
805            observer.notify(event.clone());
806        }
807    }
808}
809
810#[derive(Clone, Debug)]
811pub enum ASKitEvent {
812    AgentIn(String, String),                 // (agent_id, channel)
813    AgentDisplay(String, String, AgentData), // (agent_id, key, data)
814    AgentError(String, String),              // (agent_id, message)
815}
816
817pub trait ASKitObserver {
818    fn notify(&self, event: ASKitEvent);
819}
820
821static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
822
823fn new_observer_id() -> usize {
824    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
825}
826
827// Agent Message
828
829#[derive(Clone)]
830pub enum AgentMessageSender {
831    Sync(std::sync::mpsc::Sender<AgentMessage>),
832    Async(mpsc::Sender<AgentMessage>),
833}