agent_stream_kit/
askit.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::board_agent;
8
9use super::agent::{Agent, AgentMessage, AgentStatus, agent_new};
10use super::config::AgentConfig;
11use super::context::AgentContext;
12use super::data::AgentData;
13use super::definition::{AgentDefaultConfig, AgentDefinition, AgentDefinitions};
14use super::error::AgentError;
15use super::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
16use super::message::{self, AgentEventMessage};
17
18#[derive(Clone)]
19pub struct ASKit {
20    // agent id -> agent
21    pub(crate) agents:
22        Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
23
24    // agent id -> sender
25    pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
26
27    // board name -> [board out agent id]
28    pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
29
30    // board name -> data
31    pub(crate) board_data: Arc<Mutex<HashMap<String, AgentData>>>,
32
33    // sourece agent id -> [target agent id / source handle / target handle]
34    pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
35
36    // agent def name -> agent definition
37    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39    // agent flows
40    pub(crate) flows: Arc<Mutex<AgentFlows>>,
41
42    // agent def name -> config
43    pub(crate) global_configs: Arc<Mutex<HashMap<String, AgentConfig>>>,
44
45    // message sender
46    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48    // observers
49    pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53    pub fn new() -> Self {
54        Self {
55            agents: Default::default(),
56            agent_txs: Default::default(),
57            board_out_agents: Default::default(),
58            board_data: Default::default(),
59            edges: Default::default(),
60            defs: Default::default(),
61            flows: Default::default(),
62            global_configs: Default::default(),
63            tx: Arc::new(Mutex::new(None)),
64            observers: Default::default(),
65        }
66    }
67
68    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69        self.tx
70            .lock()
71            .unwrap()
72            .clone()
73            .ok_or(AgentError::TxNotInitialized)
74    }
75
76    pub fn init() -> Result<Self, AgentError> {
77        let askit = Self::new();
78        askit.register_agents();
79        Ok(askit)
80    }
81
82    fn register_agents(&self) {
83        board_agent::register_agents(self);
84    }
85
86    pub async fn ready(&self) -> Result<(), AgentError> {
87        self.spawn_message_loop()?;
88        self.start_agent_flows().await?;
89        Ok(())
90    }
91
92    pub fn quit(&self) {
93        let mut tx_lock = self.tx.lock().unwrap();
94        *tx_lock = None;
95    }
96
97    pub fn register_agent(&self, def: AgentDefinition) {
98        let mut defs = self.defs.lock().unwrap();
99        defs.insert(def.name.clone(), def);
100    }
101
102    pub fn get_agent_definitions(&self) -> AgentDefinitions {
103        let defs = self.defs.lock().unwrap();
104        defs.clone()
105    }
106
107    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
108        let defs = self.defs.lock().unwrap();
109        defs.get(def_name).cloned()
110    }
111
112    pub fn get_agent_default_config(&self, def_name: &str) -> Option<AgentDefaultConfig> {
113        let defs = self.defs.lock().unwrap();
114        let Some(def) = defs.get(def_name) else {
115            return None;
116        };
117        def.default_config.clone()
118    }
119
120    // // flow
121
122    pub fn get_agent_flows(&self) -> AgentFlows {
123        let flows = self.flows.lock().unwrap();
124        flows.clone()
125    }
126
127    pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
128        if !Self::is_valid_flow_name(name) {
129            return Err(AgentError::InvalidFlowName(name.into()));
130        }
131
132        let new_name = self.unique_flow_name(name);
133        let mut flows = self.flows.lock().unwrap();
134        let flow = AgentFlow::new(new_name.clone());
135        flows.insert(new_name, flow.clone());
136        Ok(flow)
137    }
138
139    pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
140        if !Self::is_valid_flow_name(new_name) {
141            return Err(AgentError::InvalidFlowName(new_name.into()));
142        }
143
144        // check if the new name is already used
145        let new_name = self.unique_flow_name(new_name);
146
147        let mut flows = self.flows.lock().unwrap();
148
149        // remove the original flow
150        let Some(mut flow) = flows.remove(old_name) else {
151            return Err(AgentError::RenameFlowFailed(old_name.into()));
152        };
153
154        // insert renamed flow
155        flow.set_name(new_name.clone());
156        flows.insert(new_name.clone(), flow);
157        Ok(new_name)
158    }
159
160    fn is_valid_flow_name(new_name: &str) -> bool {
161        // Check if the name is empty
162        if new_name.trim().is_empty() {
163            return false;
164        }
165
166        // Checks for path-like names:
167        if new_name.contains('/') {
168            // Disallow leading, trailing, or consecutive slashes
169            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
170                return false;
171            }
172            // Disallow segments that are "." or ".."
173            if new_name
174                .split('/')
175                .any(|segment| segment == "." || segment == "..")
176            {
177                return false;
178            }
179        }
180
181        // Check if the name contains invalid characters
182        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
183        for c in invalid_chars {
184            if new_name.contains(c) {
185                return false;
186            }
187        }
188
189        true
190    }
191
192    pub fn unique_flow_name(&self, name: &str) -> String {
193        let mut new_name = name.trim().to_string();
194        let mut i = 2;
195        let flows = self.flows.lock().unwrap();
196        while flows.contains_key(&new_name) {
197            new_name = format!("{}{}", name, i);
198            i += 1;
199        }
200        new_name
201    }
202
203    pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
204        let name = agent_flow.name();
205
206        // add the given flow into flows
207        {
208            let mut flows = self.flows.lock().unwrap();
209            if flows.contains_key(name) {
210                return Err(AgentError::DuplicateFlowName(name.into()));
211            }
212            flows.insert(name.into(), agent_flow.clone());
213        }
214
215        // add nodes into agents
216        for node in agent_flow.nodes().iter() {
217            self.add_agent(node).unwrap_or_else(|e| {
218                log::error!("Failed to add_agent_node {}: {}", node.id, e);
219            });
220        }
221
222        // add edges into edges
223        for edge in agent_flow.edges().iter() {
224            self.add_edge(edge).unwrap_or_else(|e| {
225                log::error!("Failed to add_edge {}: {}", edge.source, e);
226            });
227        }
228
229        Ok(())
230    }
231
232    pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
233        let flow = {
234            let mut flows = self.flows.lock().unwrap();
235            let Some(flow) = flows.remove(flow_name) else {
236                return Err(AgentError::FlowNotFound(flow_name.to_string()));
237            };
238            flow.clone()
239        };
240
241        flow.stop(self).await?;
242
243        // Remove all nodes and edges associated with the flow
244        for node in flow.nodes() {
245            self.remove_agent(&node.id).await?;
246        }
247        for edge in flow.edges() {
248            self.remove_edge(edge);
249        }
250
251        Ok(())
252    }
253
254    pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
255        let flow_name = flow.name();
256
257        let mut flows = self.flows.lock().unwrap();
258        flows.insert(flow_name.to_string(), flow);
259        Ok(())
260    }
261
262    pub fn add_agent_flow_node(
263        &self,
264        flow_name: &str,
265        node: &AgentFlowNode,
266    ) -> Result<(), AgentError> {
267        let mut flows = self.flows.lock().unwrap();
268        let Some(flow) = flows.get_mut(flow_name) else {
269            return Err(AgentError::FlowNotFound(flow_name.to_string()));
270        };
271        flow.add_node(node.clone());
272        self.add_agent(node)?;
273        Ok(())
274    }
275
276    pub(crate) fn add_agent(&self, node: &AgentFlowNode) -> Result<(), AgentError> {
277        let mut agents = self.agents.lock().unwrap();
278        if agents.contains_key(&node.id) {
279            return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
280        }
281        if let Ok(agent) = agent_new(
282            self.clone(),
283            node.id.clone(),
284            &node.def_name,
285            node.config.clone(),
286        ) {
287            agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
288            log::info!("Agent {} created", node.id);
289        } else {
290            return Err(AgentError::AgentCreationFailed(node.id.to_string()));
291        }
292        Ok(())
293    }
294
295    pub fn add_agent_flow_edge(
296        &self,
297        flow_name: &str,
298        edge: &AgentFlowEdge,
299    ) -> Result<(), AgentError> {
300        let mut flows = self.flows.lock().unwrap();
301        let Some(flow) = flows.get_mut(flow_name) else {
302            return Err(AgentError::FlowNotFound(flow_name.to_string()));
303        };
304        flow.add_edge(edge.clone());
305        self.add_edge(edge)?;
306        Ok(())
307    }
308
309    pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
310        // check if the source agent exists
311        {
312            let agents = self.agents.lock().unwrap();
313            if !agents.contains_key(&edge.source) {
314                return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
315            }
316        }
317
318        // check if handles are valid
319        if edge.source_handle.is_empty() {
320            return Err(AgentError::EmptySourceHandle);
321        }
322        if edge.target_handle.is_empty() {
323            return Err(AgentError::EmptyTargetHandle);
324        }
325
326        let mut edges = self.edges.lock().unwrap();
327        if let Some(targets) = edges.get_mut(&edge.source) {
328            if targets
329                .iter()
330                .any(|(target, source_handle, target_handle)| {
331                    *target == edge.target
332                        && *source_handle == edge.source_handle
333                        && *target_handle == edge.target_handle
334                })
335            {
336                return Err(AgentError::EdgeAlreadyExists);
337            }
338            targets.push((
339                edge.target.clone(),
340                edge.source_handle.clone(),
341                edge.target_handle.clone(),
342            ));
343        } else {
344            edges.insert(
345                edge.source.clone(),
346                vec![(
347                    edge.target.clone(),
348                    edge.source_handle.clone(),
349                    edge.target_handle.clone(),
350                )],
351            );
352        }
353        Ok(())
354    }
355
356    pub async fn remove_agent_flow_node(
357        &self,
358        flow_name: &str,
359        node_id: &str,
360    ) -> Result<(), AgentError> {
361        {
362            let mut flows = self.flows.lock().unwrap();
363            let Some(flow) = flows.get_mut(flow_name) else {
364                return Err(AgentError::FlowNotFound(flow_name.to_string()));
365            };
366            flow.remove_node(node_id);
367        }
368        self.remove_agent(node_id).await?;
369        Ok(())
370    }
371
372    pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
373        self.stop_agent(agent_id).await?;
374
375        // remove from edges
376        {
377            let mut edges = self.edges.lock().unwrap();
378            let mut sources_to_remove = Vec::new();
379            for (source, targets) in edges.iter_mut() {
380                targets.retain(|(target, _, _)| target != agent_id);
381                if targets.is_empty() {
382                    sources_to_remove.push(source.clone());
383                }
384            }
385            for source in sources_to_remove {
386                edges.remove(&source);
387            }
388            edges.remove(agent_id);
389        }
390
391        // remove from agents
392        {
393            let mut agents = self.agents.lock().unwrap();
394            agents.remove(agent_id);
395        }
396
397        Ok(())
398    }
399
400    pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
401        let mut flows = self.flows.lock().unwrap();
402        let Some(flow) = flows.get_mut(flow_name) else {
403            return Err(AgentError::FlowNotFound(flow_name.to_string()));
404        };
405        let Some(edge) = flow.remove_edge(edge_id) else {
406            return Err(AgentError::EdgeNotFound(edge_id.to_string()));
407        };
408        self.remove_edge(&edge);
409        Ok(())
410    }
411
412    pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
413        let mut edges = self.edges.lock().unwrap();
414        if let Some(targets) = edges.get_mut(&edge.source) {
415            targets.retain(|(target, source_handle, target_handle)| {
416                *target != edge.target
417                    || *source_handle != edge.source_handle
418                    || *target_handle != edge.target_handle
419            });
420            if targets.is_empty() {
421                edges.remove(&edge.source);
422            }
423        }
424    }
425
426    pub fn copy_sub_flow(
427        &self,
428        nodes: &Vec<AgentFlowNode>,
429        edges: &Vec<AgentFlowEdge>,
430    ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
431        flow::copy_sub_flow(nodes, edges)
432    }
433
434    pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
435        let flows = self.flows.lock().unwrap();
436        let Some(flow) = flows.get(name) else {
437            return Err(AgentError::FlowNotFound(name.to_string()));
438        };
439        flow.start(self).await?;
440        Ok(())
441    }
442
443    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
444        let agent = {
445            let agents = self.agents.lock().unwrap();
446            let Some(a) = agents.get(agent_id) else {
447                return Err(AgentError::AgentNotFound(agent_id.to_string()));
448            };
449            a.clone()
450        };
451        let def_name = {
452            let agent = agent.lock().await;
453            agent.def_name().to_string()
454        };
455        let uses_native_thread = {
456            let defs = self.defs.lock().unwrap();
457            let Some(def) = defs.get(&def_name) else {
458                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
459            };
460            def.native_thread
461        };
462        let agent_status = {
463            let agent = agent.lock().await;
464            agent.status().clone()
465        };
466        if agent_status == AgentStatus::Init {
467            log::info!("Starting agent {}", agent_id);
468
469            if uses_native_thread {
470                let (tx, rx) = std::sync::mpsc::channel();
471
472                {
473                    let mut agent_txs = self.agent_txs.lock().unwrap();
474                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
475                };
476
477                let agent_id = agent_id.to_string();
478                std::thread::spawn(async move || {
479                    if let Err(e) = agent.lock().await.start() {
480                        log::error!("Failed to start agent {}: {}", agent_id, e);
481                    }
482
483                    while let Ok(message) = rx.recv() {
484                        match message {
485                            AgentMessage::Input { ctx, data } => {
486                                agent
487                                    .lock()
488                                    .await
489                                    .process(ctx, data)
490                                    .await
491                                    .unwrap_or_else(|e| {
492                                        log::error!("Process Error {}: {}", agent_id, e);
493                                    });
494                            }
495                            AgentMessage::Config { config } => {
496                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
497                                    log::error!("Config Error {}: {}", agent_id, e);
498                                });
499                            }
500                            AgentMessage::Stop => {
501                                break;
502                            }
503                        }
504                    }
505                });
506            } else {
507                let (tx, mut rx) = mpsc::channel(32);
508
509                {
510                    let mut agent_txs = self.agent_txs.lock().unwrap();
511                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
512                };
513
514                let agent_id = agent_id.to_string();
515                tokio::spawn(async move {
516                    {
517                        let mut agent_guard = agent.lock().await;
518                        if let Err(e) = agent_guard.start() {
519                            log::error!("Failed to start agent {}: {}", agent_id, e);
520                        }
521                    }
522
523                    while let Some(message) = rx.recv().await {
524                        match message {
525                            AgentMessage::Input { ctx, data } => {
526                                agent
527                                    .lock()
528                                    .await
529                                    .process(ctx, data)
530                                    .await
531                                    .unwrap_or_else(|e| {
532                                        log::error!("Process Error {}: {}", agent_id, e);
533                                    });
534                            }
535                            AgentMessage::Config { config } => {
536                                agent.lock().await.set_config(config).unwrap_or_else(|e| {
537                                    log::error!("Config Error {}: {}", agent_id, e);
538                                });
539                            }
540                            AgentMessage::Stop => {
541                                rx.close();
542                                return;
543                            }
544                        }
545                    }
546                });
547            }
548        }
549        Ok(())
550    }
551
552    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
553        let agent = {
554            let agents = self.agents.lock().unwrap();
555            let Some(a) = agents.get(agent_id) else {
556                return Err(AgentError::AgentNotFound(agent_id.to_string()));
557            };
558            a.clone()
559        };
560
561        let agent_status = {
562            let agent = agent.lock().await;
563            agent.status().clone()
564        };
565        if agent_status == AgentStatus::Start {
566            log::info!("Stopping agent {}", agent_id);
567
568            {
569                let mut agent_txs = self.agent_txs.lock().unwrap();
570                if let Some(tx) = agent_txs.remove(agent_id) {
571                    match tx {
572                        AgentMessageSender::Sync(tx) => {
573                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
574                                log::error!(
575                                    "Failed to send stop message to agent {}: {}",
576                                    agent_id,
577                                    e
578                                );
579                            });
580                        }
581                        AgentMessageSender::Async(tx) => {
582                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
583                                log::error!(
584                                    "Failed to send stop message to agent {}: {}",
585                                    agent_id,
586                                    e
587                                );
588                            });
589                        }
590                    }
591                }
592            }
593
594            agent.lock().await.stop()?;
595        }
596
597        Ok(())
598    }
599
600    pub async fn set_agent_config(
601        &self,
602        agent_id: String,
603        config: AgentConfig,
604    ) -> Result<(), AgentError> {
605        let agent = {
606            let agents = self.agents.lock().unwrap();
607            let Some(a) = agents.get(&agent_id) else {
608                return Err(AgentError::AgentNotFound(agent_id.to_string()));
609            };
610            a.clone()
611        };
612
613        let agent_status = {
614            let agent = agent.lock().await;
615            agent.status().clone()
616        };
617        if agent_status == AgentStatus::Init {
618            agent.lock().await.set_config(config.clone())?;
619        } else if agent_status == AgentStatus::Start {
620            let tx = {
621                let agent_txs = self.agent_txs.lock().unwrap();
622                let Some(tx) = agent_txs.get(&agent_id) else {
623                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
624                };
625                tx.clone()
626            };
627            let message = AgentMessage::Config { config };
628            match tx {
629                AgentMessageSender::Sync(tx) => {
630                    tx.send(message).map_err(|_| {
631                        AgentError::SendMessageFailed("Failed to send config message".to_string())
632                    })?;
633                }
634                AgentMessageSender::Async(tx) => {
635                    tx.send(message).await.map_err(|_| {
636                        AgentError::SendMessageFailed("Failed to send config message".to_string())
637                    })?;
638                }
639            }
640        }
641        Ok(())
642    }
643
644    pub fn get_global_config(&self, def_name: &str) -> Option<AgentConfig> {
645        let global_configs = self.global_configs.lock().unwrap();
646        global_configs.get(def_name).cloned()
647    }
648
649    pub fn set_global_config(&self, def_name: &str, config: AgentConfig) {
650        let mut global_configs = self.global_configs.lock().unwrap();
651        global_configs.insert(def_name.to_string(), config);
652    }
653
654    pub async fn agent_input(
655        &self,
656        agent_id: String,
657        ctx: AgentContext,
658        data: AgentData,
659    ) -> Result<(), AgentError> {
660        let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
661            let agents = self.agents.lock().unwrap();
662            let Some(a) = agents.get(&agent_id) else {
663                return Err(AgentError::AgentNotFound(agent_id.to_string()));
664            };
665            a.clone()
666        };
667
668        let agent_status = {
669            let agent = agent.lock().await;
670            agent.status().clone()
671        };
672        if agent_status == AgentStatus::Start {
673            let ch = ctx.ch().to_string();
674            let message = AgentMessage::Input { ctx, data };
675
676            let tx = {
677                let agent_txs = self.agent_txs.lock().unwrap();
678                let Some(tx) = agent_txs.get(&agent_id) else {
679                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
680                };
681                tx.clone()
682            };
683            match tx {
684                AgentMessageSender::Sync(tx) => {
685                    tx.send(message).map_err(|_| {
686                        AgentError::SendMessageFailed("Failed to send input message".to_string())
687                    })?;
688                }
689                AgentMessageSender::Async(tx) => {
690                    tx.send(message).await.map_err(|_| {
691                        AgentError::SendMessageFailed("Failed to send input message".to_string())
692                    })?;
693                }
694            }
695
696            self.emit_input(agent_id.to_string(), ch);
697        }
698        Ok(())
699    }
700
701    pub async fn send_agent_out(
702        &self,
703        agent_id: String,
704        ctx: AgentContext,
705        data: AgentData,
706    ) -> Result<(), AgentError> {
707        message::send_agent_out(self, agent_id, ctx, data).await
708    }
709
710    pub fn try_send_agent_out(
711        &self,
712        agent_id: String,
713        ctx: AgentContext,
714        data: AgentData,
715    ) -> Result<(), AgentError> {
716        message::try_send_agent_out(self, agent_id, ctx, data)
717    }
718
719    pub fn try_send_board_out(
720        &self,
721        name: String,
722        ctx: AgentContext,
723        data: AgentData,
724    ) -> Result<(), AgentError> {
725        message::try_send_board_out(self, name, ctx, data)
726    }
727
728    fn spawn_message_loop(&self) -> Result<(), AgentError> {
729        // TODO: settings for the channel size
730        let (tx, mut rx) = mpsc::channel(4096);
731        {
732            let mut tx_lock = self.tx.lock().unwrap();
733            *tx_lock = Some(tx);
734        }
735
736        // spawn the main loop
737        let askit = self.clone();
738        tokio::spawn(async move {
739            while let Some(message) = rx.recv().await {
740                use AgentEventMessage::*;
741
742                match message {
743                    AgentOut { agent, ctx, data } => {
744                        message::agent_out(&askit, agent, ctx, data).await;
745                    }
746                    BoardOut { name, ctx, data } => {
747                        message::board_out(&askit, name, ctx, data).await;
748                    }
749                }
750            }
751        });
752
753        Ok(())
754    }
755
756    async fn start_agent_flows(&self) -> Result<(), AgentError> {
757        let agent_flow_names;
758        {
759            let agent_flows = self.flows.lock().unwrap();
760            agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
761        }
762        for name in agent_flow_names {
763            self.start_agent_flow(&name).await.unwrap_or_else(|e| {
764                log::error!("Failed to start agent flow: {}", e);
765            });
766        }
767        Ok(())
768    }
769
770    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
771        let mut observers = self.observers.lock().unwrap();
772        let observer_id = new_observer_id();
773        observers.insert(observer_id, observer);
774        observer_id
775    }
776
777    pub fn unsubscribe(&self, observer_id: usize) {
778        let mut observers = self.observers.lock().unwrap();
779        observers.remove(&observer_id);
780    }
781
782    pub(crate) fn emit_error(&self, agent_id: String, message: String) {
783        self.notify_observers(ASKitEvent::AgentError(agent_id.clone(), message.clone()));
784    }
785
786    pub(crate) fn emit_input(&self, agent_id: String, ch: String) {
787        self.notify_observers(ASKitEvent::AgentIn(agent_id.clone(), ch.clone()));
788    }
789
790    pub(crate) fn emit_display(&self, agent_id: String, key: String, data: AgentData) {
791        self.notify_observers(ASKitEvent::AgentDisplay(
792            agent_id.clone(),
793            key.clone(),
794            data.clone(),
795        ));
796    }
797
798    fn notify_observers(&self, event: ASKitEvent) {
799        let observers = self.observers.lock().unwrap();
800        for (_id, observer) in observers.iter() {
801            observer.notify(event.clone());
802        }
803    }
804}
805
806#[derive(Clone, Debug)]
807pub enum ASKitEvent {
808    AgentIn(String, String),                 // (agent_id, channel)
809    AgentDisplay(String, String, AgentData), // (agent_id, key, data)
810    AgentError(String, String),              // (agent_id, message)
811}
812
813pub trait ASKitObserver {
814    fn notify(&self, event: ASKitEvent);
815}
816
817static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
818
819fn new_observer_id() -> usize {
820    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
821}
822
823// Agent Message
824
825#[derive(Clone)]
826pub enum AgentMessageSender {
827    Sync(std::sync::mpsc::Sender<AgentMessage>),
828    Async(mpsc::Sender<AgentMessage>),
829}