agent_stream_kit/
askit.rs

1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::{Mutex as AsyncMutex, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::message::{self, AgentEventMessage};
13use crate::registry;
14use crate::spec::{self, AgentSpec, AgentStream, AgentStreams, ChannelSpec};
15use crate::value::AgentValue;
16
17const MESSAGE_LIMIT: usize = 1024;
18
19#[derive(Clone)]
20pub struct ASKit {
21    // agent id -> agent
22    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
23
24    // agent id -> sender
25    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
26
27    // board name -> [board out agent id]
28    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
29
30    // board name -> value
31    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
32
33    // source agent id -> [target agent id / source handle / target handle]
34    pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
35
36    // agent def name -> agent definition
37    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39    // agent streams (stream id -> stream)
40    pub(crate) streams: Arc<Mutex<AgentStreams>>,
41
42    // agent def name -> config
43    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
44
45    // message sender
46    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48    // observers
49    pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53    pub fn new() -> Self {
54        Self {
55            agents: Default::default(),
56            agent_txs: Default::default(),
57            board_out_agents: Default::default(),
58            board_value: Default::default(),
59            channels: Default::default(),
60            defs: Default::default(),
61            streams: Default::default(),
62            global_configs_map: Default::default(),
63            tx: Arc::new(Mutex::new(None)),
64            observers: Default::default(),
65        }
66    }
67
68    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69        self.tx
70            .lock()
71            .unwrap()
72            .clone()
73            .ok_or(AgentError::TxNotInitialized)
74    }
75
76    pub fn init() -> Result<Self, AgentError> {
77        let askit = Self::new();
78        askit.register_agents();
79        Ok(askit)
80    }
81
82    fn register_agents(&self) {
83        registry::register_inventory_agents(self);
84    }
85
86    pub async fn ready(&self) -> Result<(), AgentError> {
87        self.spawn_message_loop().await?;
88        self.start_agent_streams().await?;
89        Ok(())
90    }
91
92    pub fn quit(&self) {
93        let mut tx_lock = self.tx.lock().unwrap();
94        *tx_lock = None;
95    }
96
97    pub fn register_agent_definiton(&self, def: AgentDefinition) {
98        let def_name = def.name.clone();
99        let def_global_configs = def.global_configs.clone();
100
101        let mut defs = self.defs.lock().unwrap();
102        defs.insert(def.name.clone(), def);
103
104        // if there is a global config, set it
105        if let Some(def_global_configs) = def_global_configs {
106            let mut new_configs = AgentConfigs::default();
107            for (key, config_entry) in def_global_configs.iter() {
108                new_configs.set(key.clone(), config_entry.value.clone());
109            }
110            self.set_global_configs(def_name, new_configs);
111        }
112    }
113
114    pub fn get_agent_definitions(&self) -> AgentDefinitions {
115        let defs = self.defs.lock().unwrap();
116        defs.clone()
117    }
118
119    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
120        let defs = self.defs.lock().unwrap();
121        defs.get(def_name).cloned()
122    }
123
124    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
125        let defs = self.defs.lock().unwrap();
126        let Some(def) = defs.get(def_name) else {
127            return None;
128        };
129        def.configs.clone()
130    }
131
132    pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
133        let agents = self.agents.lock().unwrap();
134        let Some(agent) = agents.get(agent_id) else {
135            return None;
136        };
137        let agent = agent.blocking_lock();
138        Some(agent.spec().clone())
139    }
140
141    // streams
142
143    pub fn get_agent_streams(&self) -> AgentStreams {
144        let streams = self.streams.lock().unwrap();
145        streams.clone()
146    }
147
148    pub fn new_agent_stream(&self, name: &str) -> Result<AgentStream, AgentError> {
149        if !Self::is_valid_stream_name(name) {
150            return Err(AgentError::InvalidStreamName(name.into()));
151        }
152
153        let new_name = self.unique_stream_name(name);
154        let mut streams = self.streams.lock().unwrap();
155        let stream = AgentStream::new(new_name.clone());
156        streams.insert(stream.id().to_string(), stream.clone());
157        Ok(stream)
158    }
159
160    pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
161        if !Self::is_valid_stream_name(new_name) {
162            return Err(AgentError::InvalidStreamName(new_name.into()));
163        }
164
165        // check if the new name is already used
166        let new_name = self.unique_stream_name(new_name);
167
168        let mut streams = self.streams.lock().unwrap();
169
170        // remove the original stream
171        let Some(mut stream) = streams.swap_remove(id) else {
172            return Err(AgentError::RenameStreamFailed(id.into()));
173        };
174
175        // insert renamed stream
176        stream.set_name(new_name.clone());
177        streams.insert(stream.id().to_string(), stream);
178        Ok(new_name)
179    }
180
181    fn is_valid_stream_name(new_name: &str) -> bool {
182        // Check if the name is empty
183        if new_name.trim().is_empty() {
184            return false;
185        }
186
187        // Checks for path-like names:
188        if new_name.contains('/') {
189            // Disallow leading, trailing, or consecutive slashes
190            if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
191                return false;
192            }
193            // Disallow segments that are "." or ".."
194            if new_name
195                .split('/')
196                .any(|segment| segment == "." || segment == "..")
197            {
198                return false;
199            }
200        }
201
202        // Check if the name contains invalid characters
203        let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
204        for c in invalid_chars {
205            if new_name.contains(c) {
206                return false;
207            }
208        }
209
210        true
211    }
212
213    pub fn unique_stream_name(&self, name: &str) -> String {
214        let mut new_name = name.trim().to_string();
215        let mut i = 2;
216        let streams = self.streams.lock().unwrap();
217        while streams.values().any(|stream| stream.name() == new_name) {
218            new_name = format!("{}{}", name, i);
219            i += 1;
220        }
221        new_name
222    }
223
224    pub fn add_agent_stream(&self, agent_stream: &AgentStream) -> Result<(), AgentError> {
225        let id = agent_stream.id();
226
227        // add the given stream into streams
228        {
229            let mut streams = self.streams.lock().unwrap();
230            if streams.contains_key(id) {
231                return Err(AgentError::DuplicateId(id.into()));
232            }
233            streams.insert(id.to_string(), agent_stream.clone());
234        }
235
236        // add agents
237        for agent in agent_stream.agents().iter() {
238            if let Err(e) = self.add_agent_internal(id.to_string(), agent.clone()) {
239                log::error!("Failed to add_agent {}: {}", agent.id, e);
240            }
241        }
242
243        // add channels
244        for channel in agent_stream.channels().iter() {
245            self.add_channel_internal(channel.clone()).unwrap_or_else(|e| {
246                log::error!("Failed to add_channel {}: {}", channel.source, e);
247            });
248        }
249
250        Ok(())
251    }
252
253    pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
254        let stream = {
255            let mut streams = self.streams.lock().unwrap();
256            let Some(stream) = streams.swap_remove(id) else {
257                return Err(AgentError::StreamNotFound(id.to_string()));
258            };
259            stream.clone()
260        };
261
262        stream.stop(self).await?;
263
264        // Remove all agents and channels associated with the stream
265        for agent in stream.agents() {
266            self.remove_agent_internal(&agent.id).await?;
267        }
268        for channel in stream.channels() {
269            self.remove_channel_internal(channel);
270        }
271
272        Ok(())
273    }
274
275    pub fn insert_agent_stream(&self, stream: AgentStream) -> Result<(), AgentError> {
276        let stream_id = stream.id();
277
278        let mut streams = self.streams.lock().unwrap();
279        streams.insert(stream_id.to_string(), stream);
280        Ok(())
281    }
282
283    /// Create a new agent spec from the given agent definition name.
284    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
285        let def = self
286            .get_agent_definition(def_name)
287            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
288        Ok(AgentSpec::from_def(&def))
289    }
290
291    /// Add an agent to the specified stream.
292    pub fn add_agent(
293        &self,
294        stream_id: String,
295        spec: AgentSpec,
296    ) -> Result<(), AgentError> {
297        let mut streams = self.streams.lock().unwrap();
298        let Some(stream) = streams.get_mut(&stream_id) else {
299            return Err(AgentError::StreamNotFound(stream_id.to_string()));
300        };
301        self.add_agent_internal(stream_id, spec.clone())?;
302        stream.add_agent(spec.clone());
303        Ok(())
304    }
305
306    fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
307        let mut agents = self.agents.lock().unwrap();
308        if agents.contains_key(&spec.id) {
309            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
310        }
311        let spec_id = spec.id.clone();
312        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
313        agent.set_stream_id(stream_id);
314        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
315        Ok(())
316    }
317
318    /// Get the agent by id.
319    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
320        let agents = self.agents.lock().unwrap();
321        agents.get(agent_id).cloned()
322    }
323
324    pub fn add_channel(
325        &self,
326        stream_id: &str,
327        channel: ChannelSpec,
328    ) -> Result<(), AgentError> {
329        let mut streams = self.streams.lock().unwrap();
330        let Some(stream) = streams.get_mut(stream_id) else {
331            return Err(AgentError::StreamNotFound(stream_id.to_string()));
332        };
333        stream.add_channels(channel.clone());
334        self.add_channel_internal(channel)?;
335        Ok(())
336    }
337
338    fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
339        // check if the source agent exists
340        {
341            let agents = self.agents.lock().unwrap();
342            if !agents.contains_key(&channel.source) {
343                return Err(AgentError::SourceAgentNotFound(channel.source.to_string()));
344            }
345        }
346
347        // check if handles are valid
348        if channel.source_handle.is_empty() {
349            return Err(AgentError::EmptySourceHandle);
350        }
351        if channel.target_handle.is_empty() {
352            return Err(AgentError::EmptyTargetHandle);
353        }
354
355        let mut channels = self.channels.lock().unwrap();
356        if let Some(targets) = channels.get_mut(&channel.source) {
357            if targets
358                .iter()
359                .any(|(target, source_handle, target_handle)| {
360                    *target == channel.target
361                        && *source_handle == channel.source_handle
362                        && *target_handle == channel.target_handle
363                })
364            {
365                return Err(AgentError::ChannelAlreadyExists);
366            }
367            targets.push((
368                channel.target,
369                channel.source_handle,
370                channel.target_handle,
371            ));
372        } else {
373            channels.insert(
374                channel.source,
375                vec![(
376                    channel.target,
377                    channel.source_handle,
378                    channel.target_handle,
379                )],
380            );
381        }
382        Ok(())
383    }
384
385    pub async fn remove_agent(
386        &self,
387        stream_id: &str,
388        agent_id: &str,
389    ) -> Result<(), AgentError> {
390        {
391            let mut streams = self.streams.lock().unwrap();
392            let Some(stream) = streams.get_mut(stream_id) else {
393                return Err(AgentError::StreamNotFound(stream_id.to_string()));
394            };
395            stream.remove_agent(agent_id);
396        }
397        self.remove_agent_internal(agent_id).await?;
398        Ok(())
399    }
400
401    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
402        self.stop_agent(agent_id).await?;
403
404        // remove from channels
405        {
406            let mut channels = self.channels.lock().unwrap();
407            let mut sources_to_remove = Vec::new();
408            for (source, targets) in channels.iter_mut() {
409                targets.retain(|(target, _, _)| target != agent_id);
410                if targets.is_empty() {
411                    sources_to_remove.push(source.clone());
412                }
413            }
414            for source in sources_to_remove {
415                channels.swap_remove(&source);
416            }
417            channels.swap_remove(agent_id);
418        }
419
420        // remove from agents
421        {
422            let mut agents = self.agents.lock().unwrap();
423            agents.swap_remove(agent_id);
424        }
425
426        Ok(())
427    }
428
429    pub fn remove_channel(
430        &self,
431        stream_id: &str,
432        channel_id: &str,
433    ) -> Result<(), AgentError> {
434        let mut streams = self.streams.lock().unwrap();
435        let Some(stream) = streams.get_mut(stream_id) else {
436            return Err(AgentError::StreamNotFound(stream_id.to_string()));
437        };
438        let Some(channel) = stream.remove_channel(channel_id) else {
439            return Err(AgentError::ChannelNotFound(channel_id.to_string()));
440        };
441        self.remove_channel_internal(&channel);
442        Ok(())
443    }
444
445    fn remove_channel_internal(&self, channel: &ChannelSpec) {
446        let mut channels = self.channels.lock().unwrap();
447        if let Some(targets) = channels.get_mut(&channel.source) {
448            targets.retain(|(target, source_handle, target_handle)| {
449                *target != channel.target
450                    || *source_handle != channel.source_handle
451                    || *target_handle != channel.target_handle
452            });
453            if targets.is_empty() {
454                channels.swap_remove(&channel.source);
455            }
456        }
457    }
458
459    pub fn copy_sub_stream(
460        &self,
461        agents: &Vec<AgentSpec>,
462        channels: &Vec<ChannelSpec>,
463    ) -> (Vec<AgentSpec>, Vec<ChannelSpec>) {
464        spec::copy_sub_stream(agents, channels)
465    }
466
467    pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
468        let stream = {
469            let streams = self.streams.lock().unwrap();
470            let Some(stream) = streams.get(id) else {
471                return Err(AgentError::StreamNotFound(id.to_string()));
472            };
473            stream.clone()
474        };
475        stream.start(self).await?;
476        Ok(())
477    }
478
479    pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
480        let stream = {
481            let streams = self.streams.lock().unwrap();
482            let Some(stream) = streams.get(id) else {
483                return Err(AgentError::StreamNotFound(id.to_string()));
484            };
485            stream.clone()
486        };
487        stream.stop(self).await?;
488        Ok(())
489    }
490
491    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
492        let agent = {
493            let agents = self.agents.lock().unwrap();
494            let Some(a) = agents.get(agent_id) else {
495                return Err(AgentError::AgentNotFound(agent_id.to_string()));
496            };
497            a.clone()
498        };
499        let def_name = {
500            let agent = agent.lock().await;
501            agent.def_name().to_string()
502        };
503        let uses_native_thread = {
504            let defs = self.defs.lock().unwrap();
505            let Some(def) = defs.get(&def_name) else {
506                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
507            };
508            def.native_thread
509        };
510        let agent_status = {
511            let agent = agent.lock().await;
512            agent.status().clone()
513        };
514        if agent_status == AgentStatus::Init {
515            log::info!("Starting agent {}", agent_id);
516
517            if uses_native_thread {
518                let (tx, rx) = std::sync::mpsc::channel();
519
520                {
521                    let mut agent_txs = self.agent_txs.lock().unwrap();
522                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
523                };
524
525                let agent_id = agent_id.to_string();
526                std::thread::spawn(async move || {
527                    if let Err(e) = agent.lock().await.start().await {
528                        log::error!("Failed to start agent {}: {}", agent_id, e);
529                    }
530
531                    while let Ok(message) = rx.recv() {
532                        match message {
533                            AgentMessage::Input { ctx, pin, value } => {
534                                agent
535                                    .lock()
536                                    .await
537                                    .process(ctx, pin, value)
538                                    .await
539                                    .unwrap_or_else(|e| {
540                                        log::error!("Process Error {}: {}", agent_id, e);
541                                    });
542                            }
543                            AgentMessage::Config { configs } => {
544                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
545                                    log::error!("Config Error {}: {}", agent_id, e);
546                                });
547                            }
548                            AgentMessage::Stop => {
549                                break;
550                            }
551                        }
552                    }
553                });
554            } else {
555                let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
556
557                {
558                    let mut agent_txs = self.agent_txs.lock().unwrap();
559                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
560                };
561
562                let agent_id = agent_id.to_string();
563                tokio::spawn(async move {
564                    {
565                        let mut agent_guard = agent.lock().await;
566                        if let Err(e) = agent_guard.start().await {
567                            log::error!("Failed to start agent {}: {}", agent_id, e);
568                        }
569                    }
570
571                    while let Some(message) = rx.recv().await {
572                        match message {
573                            AgentMessage::Input { ctx, pin, value } => {
574                                agent
575                                    .lock()
576                                    .await
577                                    .process(ctx, pin, value)
578                                    .await
579                                    .unwrap_or_else(|e| {
580                                        log::error!("Process Error {}: {}", agent_id, e);
581                                    });
582                            }
583                            AgentMessage::Config { configs } => {
584                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
585                                    log::error!("Config Error {}: {}", agent_id, e);
586                                });
587                            }
588                            AgentMessage::Stop => {
589                                rx.close();
590                                return;
591                            }
592                        }
593                    }
594                });
595                tokio::task::yield_now().await;
596            }
597        }
598        Ok(())
599    }
600
601    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
602        let agent = {
603            let agents = self.agents.lock().unwrap();
604            let Some(a) = agents.get(agent_id) else {
605                return Err(AgentError::AgentNotFound(agent_id.to_string()));
606            };
607            a.clone()
608        };
609
610        let agent_status = {
611            let agent = agent.lock().await;
612            agent.status().clone()
613        };
614        if agent_status == AgentStatus::Start {
615            log::info!("Stopping agent {}", agent_id);
616
617            {
618                let mut agent_txs = self.agent_txs.lock().unwrap();
619                if let Some(tx) = agent_txs.swap_remove(agent_id) {
620                    match tx {
621                        AgentMessageSender::Sync(tx) => {
622                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
623                                log::error!(
624                                    "Failed to send stop message to agent {}: {}",
625                                    agent_id,
626                                    e
627                                );
628                            });
629                        }
630                        AgentMessageSender::Async(tx) => {
631                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
632                                log::error!(
633                                    "Failed to send stop message to agent {}: {}",
634                                    agent_id,
635                                    e
636                                );
637                            });
638                        }
639                    }
640                }
641            }
642
643            agent.lock().await.stop().await?;
644        }
645
646        Ok(())
647    }
648
649    pub async fn set_agent_configs(
650        &self,
651        agent_id: String,
652        configs: AgentConfigs,
653    ) -> Result<(), AgentError> {
654        let agent = {
655            let agents = self.agents.lock().unwrap();
656            let Some(a) = agents.get(&agent_id) else {
657                return Err(AgentError::AgentNotFound(agent_id.to_string()));
658            };
659            a.clone()
660        };
661
662        let agent_status = {
663            let agent = agent.lock().await;
664            agent.status().clone()
665        };
666        if agent_status == AgentStatus::Init {
667            agent.lock().await.set_configs(configs.clone())?;
668        } else if agent_status == AgentStatus::Start {
669            let tx = {
670                let agent_txs = self.agent_txs.lock().unwrap();
671                let Some(tx) = agent_txs.get(&agent_id) else {
672                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
673                };
674                tx.clone()
675            };
676            let message = AgentMessage::Config { configs };
677            match tx {
678                AgentMessageSender::Sync(tx) => {
679                    tx.send(message).map_err(|_| {
680                        AgentError::SendMessageFailed("Failed to send config message".to_string())
681                    })?;
682                }
683                AgentMessageSender::Async(tx) => {
684                    tx.send(message).await.map_err(|_| {
685                        AgentError::SendMessageFailed("Failed to send config message".to_string())
686                    })?;
687                }
688            }
689        }
690        Ok(())
691    }
692
693    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
694        let global_configs_map = self.global_configs_map.lock().unwrap();
695        global_configs_map.get(def_name).cloned()
696    }
697
698    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
699        let mut global_configs_map = self.global_configs_map.lock().unwrap();
700
701        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
702            global_configs_map.insert(def_name, configs);
703            return;
704        };
705
706        for (key, value) in configs {
707            existing_configs.set(key, value);
708        }
709    }
710
711    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
712        let global_configs_map = self.global_configs_map.lock().unwrap();
713        global_configs_map.clone()
714    }
715
716    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
717        for (agent_name, new_configs) in new_configs_map {
718            self.set_global_configs(agent_name, new_configs);
719        }
720    }
721
722    pub async fn agent_input(
723        &self,
724        agent_id: String,
725        ctx: AgentContext,
726        pin: String,
727        value: AgentValue,
728    ) -> Result<(), AgentError> {
729        let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
730            let agents = self.agents.lock().unwrap();
731            let Some(a) = agents.get(&agent_id) else {
732                return Err(AgentError::AgentNotFound(agent_id.to_string()));
733            };
734            a.clone()
735        };
736
737        let agent_status = {
738            let agent = agent.lock().await;
739            agent.status().clone()
740        };
741        if agent_status != AgentStatus::Start {
742            return Ok(());
743        }
744
745        if pin.starts_with("config:") {
746            let config_key = pin[7..].to_string();
747            let mut agent = agent.lock().await;
748            agent.set_config(config_key.clone(), value.clone())?;
749            return Ok(());
750        }
751
752        let message = AgentMessage::Input {
753            ctx,
754            pin: pin.clone(),
755            value,
756        };
757
758        let tx = {
759            let agent_txs = self.agent_txs.lock().unwrap();
760            let Some(tx) = agent_txs.get(&agent_id) else {
761                return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
762            };
763            tx.clone()
764        };
765        match tx {
766            AgentMessageSender::Sync(tx) => {
767                tx.send(message).map_err(|_| {
768                    AgentError::SendMessageFailed("Failed to send input message".to_string())
769                })?;
770            }
771            AgentMessageSender::Async(tx) => {
772                tx.send(message).await.map_err(|_| {
773                    AgentError::SendMessageFailed("Failed to send input message".to_string())
774                })?;
775            }
776        }
777
778        self.emit_agent_input(agent_id.to_string(), pin);
779
780        Ok(())
781    }
782
783    pub async fn send_agent_out(
784        &self,
785        agent_id: String,
786        ctx: AgentContext,
787        pin: String,
788        value: AgentValue,
789    ) -> Result<(), AgentError> {
790        message::send_agent_out(self, agent_id, ctx, pin, value).await
791    }
792
793    pub fn try_send_agent_out(
794        &self,
795        agent_id: String,
796        ctx: AgentContext,
797        pin: String,
798        value: AgentValue,
799    ) -> Result<(), AgentError> {
800        message::try_send_agent_out(self, agent_id, ctx, pin, value)
801    }
802
803    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
804        self.try_send_board_out(name, AgentContext::new(), value)
805    }
806
807    pub(crate) fn try_send_board_out(
808        &self,
809        name: String,
810        ctx: AgentContext,
811        value: AgentValue,
812    ) -> Result<(), AgentError> {
813        message::try_send_board_out(self, name, ctx, value)
814    }
815
816    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
817        // TODO: settings for the channel size
818        let (tx, mut rx) = mpsc::channel(4096);
819        {
820            let mut tx_lock = self.tx.lock().unwrap();
821            *tx_lock = Some(tx);
822        }
823
824        // spawn the main loop
825        let askit = self.clone();
826        tokio::spawn(async move {
827            while let Some(message) = rx.recv().await {
828                use AgentEventMessage::*;
829
830                match message {
831                    AgentOut {
832                        agent,
833                        ctx,
834                        pin,
835                        value,
836                    } => {
837                        message::agent_out(&askit, agent, ctx, pin, value).await;
838                    }
839                    BoardOut { name, ctx, value } => {
840                        message::board_out(&askit, name, ctx, value).await;
841                    }
842                }
843            }
844        });
845
846        tokio::task::yield_now().await;
847
848        Ok(())
849    }
850
851    async fn start_agent_streams(&self) -> Result<(), AgentError> {
852        let agent_stream_ids;
853        {
854            let agent_streams = self.streams.lock().unwrap();
855            agent_stream_ids = agent_streams.keys().cloned().collect::<Vec<_>>();
856        }
857        for id in agent_stream_ids {
858            self.start_agent_stream(&id).await.unwrap_or_else(|e| {
859                log::error!("Failed to start agent stream: {}", e);
860            });
861        }
862        Ok(())
863    }
864
865    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
866        let mut observers = self.observers.lock().unwrap();
867        let observer_id = new_observer_id();
868        observers.insert(observer_id, observer);
869        observer_id
870    }
871
872    pub fn unsubscribe(&self, observer_id: usize) {
873        let mut observers = self.observers.lock().unwrap();
874        observers.swap_remove(&observer_id);
875    }
876
877    pub(crate) fn emit_agent_config_updated(
878        &self,
879        agent_id: String,
880        key: String,
881        value: AgentValue,
882    ) {
883        self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
884    }
885
886    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
887        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
888    }
889
890    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
891        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
892    }
893
894    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
895        self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
896    }
897
898    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
899        // ignore variables
900        if name.starts_with('%') {
901            return;
902        }
903        self.notify_observers(ASKitEvent::Board(name, value));
904    }
905
906    fn notify_observers(&self, event: ASKitEvent) {
907        let observers = self.observers.lock().unwrap();
908        for (_id, observer) in observers.iter() {
909            observer.notify(&event);
910        }
911    }
912}
913
914#[derive(Clone, Debug)]
915pub enum ASKitEvent {
916    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
917    AgentError(String, String),                     // (agent_id, message)
918    AgentIn(String, String),                        // (agent_id, pin)
919    AgentSpecUpdated(String),                       // (agent_id)
920    Board(String, AgentValue),                      // (board name, value)
921}
922
923pub trait ASKitObserver {
924    fn notify(&self, event: &ASKitEvent);
925}
926
927static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
928
929fn new_observer_id() -> usize {
930    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
931}
932
933// Agent Message
934
935#[derive(Clone)]
936pub enum AgentMessageSender {
937    Sync(std::sync::mpsc::Sender<AgentMessage>),
938    Async(mpsc::Sender<AgentMessage>),
939}