agent_stream_kit/
askit.rs

1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::{Mutex as AsyncMutex, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::message::{self, AgentEventMessage};
13use crate::registry;
14use crate::spec::{self, AgentSpec, AgentStreamSpec, ChannelSpec};
15use crate::stream::{AgentStream, AgentStreamInfo, AgentStreams};
16use crate::value::AgentValue;
17
18const MESSAGE_LIMIT: usize = 1024;
19
20#[derive(Clone)]
21pub struct ASKit {
22    // agent id -> agent
23    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
24
25    // agent id -> sender
26    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
27
28    // board name -> [board out agent id]
29    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
30
31    // board name -> value
32    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
33
34    // source agent id -> [target agent id / source handle / target handle]
35    pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
36
37    // agent def name -> agent definition
38    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
39
40    // agent streams (stream id -> stream)
41    pub(crate) streams: Arc<Mutex<AgentStreams>>,
42
43    // agent def name -> config
44    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
45
46    // message sender
47    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
48
49    // observers
50    pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
51}
52
53impl ASKit {
54    pub fn new() -> Self {
55        Self {
56            agents: Default::default(),
57            agent_txs: Default::default(),
58            board_out_agents: Default::default(),
59            board_value: Default::default(),
60            channels: Default::default(),
61            defs: Default::default(),
62            streams: Default::default(),
63            global_configs_map: Default::default(),
64            tx: Arc::new(Mutex::new(None)),
65            observers: Default::default(),
66        }
67    }
68
69    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
70        self.tx
71            .lock()
72            .unwrap()
73            .clone()
74            .ok_or(AgentError::TxNotInitialized)
75    }
76
77    pub fn init() -> Result<Self, AgentError> {
78        let askit = Self::new();
79        askit.register_agents();
80        Ok(askit)
81    }
82
83    fn register_agents(&self) {
84        registry::register_inventory_agents(self);
85    }
86
87    pub async fn ready(&self) -> Result<(), AgentError> {
88        self.spawn_message_loop().await?;
89        self.start_agent_streams_on_start().await?;
90        Ok(())
91    }
92
93    pub fn quit(&self) {
94        let mut tx_lock = self.tx.lock().unwrap();
95        *tx_lock = None;
96    }
97
98    pub fn register_agent_definiton(&self, def: AgentDefinition) {
99        let def_name = def.name.clone();
100        let def_global_configs = def.global_configs.clone();
101
102        let mut defs = self.defs.lock().unwrap();
103        defs.insert(def.name.clone(), def);
104
105        // if there is a global config, set it
106        if let Some(def_global_configs) = def_global_configs {
107            let mut new_configs = AgentConfigs::default();
108            for (key, config_entry) in def_global_configs.iter() {
109                new_configs.set(key.clone(), config_entry.value.clone());
110            }
111            self.set_global_configs(def_name, new_configs);
112        }
113    }
114
115    pub fn get_agent_definitions(&self) -> AgentDefinitions {
116        let defs = self.defs.lock().unwrap();
117        defs.clone()
118    }
119
120    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
121        let defs = self.defs.lock().unwrap();
122        defs.get(def_name).cloned()
123    }
124
125    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
126        let defs = self.defs.lock().unwrap();
127        let Some(def) = defs.get(def_name) else {
128            return None;
129        };
130        def.configs.clone()
131    }
132
133    pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
134        let agents = self.agents.lock().unwrap();
135        let Some(agent) = agents.get(agent_id) else {
136            return None;
137        };
138        let agent = agent.blocking_lock();
139        Some(agent.spec().clone())
140    }
141
142    // streams
143
144    /// Get info of the agent stream by id.
145    pub fn get_agent_stream_info(&self, id: &str) -> Option<AgentStreamInfo> {
146        let streams = self.streams.lock().unwrap();
147        streams.get(id).map(|stream| stream.into())
148    }
149
150    /// Get infos of all agent streams.
151    pub fn get_agent_stream_infos(&self) -> Vec<AgentStreamInfo> {
152        let streams = self.streams.lock().unwrap();
153        streams.values().map(|s| s.into()).collect()
154    }
155
156    /// Get the agent stream spec by id.
157    pub fn get_agent_stream_spec(&self, id: &str) -> Option<AgentStreamSpec> {
158        let streams = self.streams.lock().unwrap();
159        streams.get(id).map(|stream| stream.spec().clone())
160    }
161
162    /// Set the agent stream spec by id.
163    pub fn set_agent_stream_spec(&self, id: &str, spec: AgentStreamSpec) -> Result<(), AgentError> {
164        let mut streams = self.streams.lock().unwrap();
165        let Some(stream) = streams.get_mut(id) else {
166            return Err(AgentError::StreamNotFound(id.to_string()));
167        };
168        *stream.spec_mut() = spec;
169        Ok(())
170    }
171
172    /// Create a new agent stream with the given name.
173    /// If the name already exists, a unique name will be generated by appending a number suffix.
174    /// Returns the id of the new agent stream.
175    pub fn new_agent_stream(&self, name: &str) -> Result<String, AgentError> {
176        if !is_valid_stream_name(name) {
177            return Err(AgentError::InvalidStreamName(name.into()));
178        }
179        let new_name = self.unique_stream_name(name);
180        let spec = AgentStreamSpec::default();
181        let id = self.add_agent_stream(new_name, spec)?;
182        Ok(id)
183    }
184
185    pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
186        if !is_valid_stream_name(new_name) {
187            return Err(AgentError::InvalidStreamName(new_name.into()));
188        }
189
190        // check if the new name is already used
191        let new_name = self.unique_stream_name(new_name);
192
193        let mut streams = self.streams.lock().unwrap();
194
195        // remove the original stream
196        let Some(mut stream) = streams.swap_remove(id) else {
197            return Err(AgentError::RenameStreamFailed(id.into()));
198        };
199
200        // insert renamed stream
201        stream.set_name(new_name.clone());
202        streams.insert(stream.id().to_string(), stream);
203        Ok(new_name)
204    }
205
206    pub fn unique_stream_name(&self, name: &str) -> String {
207        let mut new_name = name.trim().to_string();
208        let mut i = 2;
209        let streams = self.streams.lock().unwrap();
210        while streams.values().any(|stream| stream.name() == new_name) {
211            new_name = format!("{}{}", name, i);
212            i += 1;
213        }
214        new_name
215    }
216
217    pub fn add_agent_stream(
218        &self,
219        name: String,
220        spec: AgentStreamSpec,
221    ) -> Result<String, AgentError> {
222        let stream = AgentStream::new(name, spec);
223        let id = stream.id().to_string();
224
225        // add agents
226        for agent in &stream.spec().agents {
227            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
228                log::error!("Failed to add_agent {}: {}", agent.id, e);
229            }
230        }
231
232        // add channels
233        for channel in &stream.spec().channels {
234            self.add_channel_internal(channel.clone())
235                .unwrap_or_else(|e| {
236                    log::error!("Failed to add_channel {}: {}", channel.source, e);
237                });
238        }
239
240        // add the given stream into streams
241        let mut streams = self.streams.lock().unwrap();
242        if streams.contains_key(&id) {
243            return Err(AgentError::DuplicateId(id.into()));
244        }
245        streams.insert(id.to_string(), stream);
246
247        Ok(id)
248    }
249
250    pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
251        let mut stream = {
252            let mut streams = self.streams.lock().unwrap();
253            let Some(stream) = streams.swap_remove(id) else {
254                return Err(AgentError::StreamNotFound(id.to_string()));
255            };
256            stream
257        };
258
259        stream.stop(self).await.unwrap_or_else(|e| {
260            log::error!("Failed to stop stream {}: {}", id, e);
261        });
262
263        // Remove all agents and channels associated with the stream
264        for agent in &stream.spec().agents {
265            self.remove_agent_internal(&agent.id)
266                .await
267                .unwrap_or_else(|e| {
268                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
269                });
270        }
271        for channel in &stream.spec().channels {
272            self.remove_channel_internal(channel);
273        }
274
275        Ok(())
276    }
277
278    pub fn copy_sub_stream(
279        &self,
280        agents: &Vec<AgentSpec>,
281        channels: &Vec<ChannelSpec>,
282    ) -> (Vec<AgentSpec>, Vec<ChannelSpec>) {
283        spec::copy_sub_stream(agents, channels)
284    }
285
286    pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
287        let mut stream = {
288            let mut streams = self.streams.lock().unwrap();
289            let Some(stream) = streams.swap_remove(id) else {
290                return Err(AgentError::StreamNotFound(id.to_string()));
291            };
292            stream
293        };
294
295        stream.start(self).await?;
296
297        let mut streams = self.streams.lock().unwrap();
298        streams.insert(id.to_string(), stream);
299        Ok(())
300    }
301
302    pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
303        let mut stream = {
304            let mut streams = self.streams.lock().unwrap();
305            let Some(stream) = streams.swap_remove(id) else {
306                return Err(AgentError::StreamNotFound(id.to_string()));
307            };
308            stream
309        };
310
311        stream.stop(self).await?;
312
313        let mut streams = self.streams.lock().unwrap();
314        streams.insert(id.to_string(), stream);
315        Ok(())
316    }
317
318    // Agents
319
320    /// Create a new agent spec from the given agent definition name.
321    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
322        let def = self
323            .get_agent_definition(def_name)
324            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
325        Ok(AgentSpec::from_def(&def))
326    }
327
328    /// Add an agent to the specified stream.
329    pub fn add_agent(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
330        let mut streams = self.streams.lock().unwrap();
331        let Some(stream) = streams.get_mut(&stream_id) else {
332            return Err(AgentError::StreamNotFound(stream_id.to_string()));
333        };
334        self.add_agent_internal(stream_id, spec.clone())?;
335        stream.spec_mut().add_agent(spec.clone());
336        Ok(())
337    }
338
339    fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
340        let mut agents = self.agents.lock().unwrap();
341        if agents.contains_key(&spec.id) {
342            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
343        }
344        let spec_id = spec.id.clone();
345        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
346        agent.set_stream_id(stream_id);
347        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
348        Ok(())
349    }
350
351    /// Get the agent by id.
352    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
353        let agents = self.agents.lock().unwrap();
354        agents.get(agent_id).cloned()
355    }
356
357    pub fn add_channel(&self, stream_id: &str, channel: ChannelSpec) -> Result<(), AgentError> {
358        let mut streams = self.streams.lock().unwrap();
359        let Some(stream) = streams.get_mut(stream_id) else {
360            return Err(AgentError::StreamNotFound(stream_id.to_string()));
361        };
362        stream.spec_mut().add_channels(channel.clone());
363        self.add_channel_internal(channel)?;
364        Ok(())
365    }
366
367    fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
368        // check if the source agent exists
369        {
370            let agents = self.agents.lock().unwrap();
371            if !agents.contains_key(&channel.source) {
372                return Err(AgentError::SourceAgentNotFound(channel.source.to_string()));
373            }
374        }
375
376        // check if handles are valid
377        if channel.source_handle.is_empty() {
378            return Err(AgentError::EmptySourceHandle);
379        }
380        if channel.target_handle.is_empty() {
381            return Err(AgentError::EmptyTargetHandle);
382        }
383
384        let mut channels = self.channels.lock().unwrap();
385        if let Some(targets) = channels.get_mut(&channel.source) {
386            if targets
387                .iter()
388                .any(|(target, source_handle, target_handle)| {
389                    *target == channel.target
390                        && *source_handle == channel.source_handle
391                        && *target_handle == channel.target_handle
392                })
393            {
394                return Err(AgentError::ChannelAlreadyExists);
395            }
396            targets.push((channel.target, channel.source_handle, channel.target_handle));
397        } else {
398            channels.insert(
399                channel.source,
400                vec![(channel.target, channel.source_handle, channel.target_handle)],
401            );
402        }
403        Ok(())
404    }
405
406    pub async fn remove_agent(&self, stream_id: &str, agent_id: &str) -> Result<(), AgentError> {
407        {
408            let mut streams = self.streams.lock().unwrap();
409            let Some(stream) = streams.get_mut(stream_id) else {
410                return Err(AgentError::StreamNotFound(stream_id.to_string()));
411            };
412            stream.spec_mut().remove_agent(agent_id);
413        }
414        if let Err(e) = self.remove_agent_internal(agent_id).await {
415            return Err(e);
416        }
417        Ok(())
418    }
419
420    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
421        self.stop_agent(agent_id).await?;
422
423        // remove from channels
424        {
425            let mut channels = self.channels.lock().unwrap();
426            let mut sources_to_remove = Vec::new();
427            for (source, targets) in channels.iter_mut() {
428                targets.retain(|(target, _, _)| target != agent_id);
429                if targets.is_empty() {
430                    sources_to_remove.push(source.clone());
431                }
432            }
433            for source in sources_to_remove {
434                channels.swap_remove(&source);
435            }
436            channels.swap_remove(agent_id);
437        }
438
439        // remove from agents
440        {
441            let mut agents = self.agents.lock().unwrap();
442            agents.swap_remove(agent_id);
443        }
444
445        Ok(())
446    }
447
448    pub fn remove_channel(&self, stream_id: &str, channel_id: &str) -> Result<(), AgentError> {
449        let mut stream = {
450            let mut streams = self.streams.lock().unwrap();
451            let Some(stream) = streams.swap_remove(stream_id) else {
452                return Err(AgentError::StreamNotFound(stream_id.to_string()));
453            };
454            stream
455        };
456
457        let Some(channel) = stream.spec_mut().remove_channel(channel_id) else {
458            let mut streams = self.streams.lock().unwrap();
459            streams.insert(stream_id.to_string(), stream);
460            return Err(AgentError::ChannelNotFound(channel_id.to_string()));
461        };
462        let mut streams = self.streams.lock().unwrap();
463        streams.insert(stream_id.to_string(), stream);
464
465        self.remove_channel_internal(&channel);
466        Ok(())
467    }
468
469    fn remove_channel_internal(&self, channel: &ChannelSpec) {
470        let mut channels = self.channels.lock().unwrap();
471        if let Some(targets) = channels.get_mut(&channel.source) {
472            targets.retain(|(target, source_handle, target_handle)| {
473                *target != channel.target
474                    || *source_handle != channel.source_handle
475                    || *target_handle != channel.target_handle
476            });
477            if targets.is_empty() {
478                channels.swap_remove(&channel.source);
479            }
480        }
481    }
482
483    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
484        let agent = {
485            let agents = self.agents.lock().unwrap();
486            let Some(a) = agents.get(agent_id) else {
487                return Err(AgentError::AgentNotFound(agent_id.to_string()));
488            };
489            a.clone()
490        };
491        let def_name = {
492            let agent = agent.lock().await;
493            agent.def_name().to_string()
494        };
495        let uses_native_thread = {
496            let defs = self.defs.lock().unwrap();
497            let Some(def) = defs.get(&def_name) else {
498                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
499            };
500            def.native_thread
501        };
502        let agent_status = {
503            let agent = agent.lock().await;
504            agent.status().clone()
505        };
506        if agent_status == AgentStatus::Init {
507            log::info!("Starting agent {}", agent_id);
508
509            if uses_native_thread {
510                let (tx, rx) = std::sync::mpsc::channel();
511
512                {
513                    let mut agent_txs = self.agent_txs.lock().unwrap();
514                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
515                };
516
517                let agent_id = agent_id.to_string();
518                std::thread::spawn(async move || {
519                    if let Err(e) = agent.lock().await.start().await {
520                        log::error!("Failed to start agent {}: {}", agent_id, e);
521                    }
522
523                    while let Ok(message) = rx.recv() {
524                        match message {
525                            AgentMessage::Input { ctx, pin, value } => {
526                                agent
527                                    .lock()
528                                    .await
529                                    .process(ctx, pin, value)
530                                    .await
531                                    .unwrap_or_else(|e| {
532                                        log::error!("Process Error {}: {}", agent_id, e);
533                                    });
534                            }
535                            AgentMessage::Config { configs } => {
536                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
537                                    log::error!("Config Error {}: {}", agent_id, e);
538                                });
539                            }
540                            AgentMessage::Stop => {
541                                break;
542                            }
543                        }
544                    }
545                });
546            } else {
547                let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
548
549                {
550                    let mut agent_txs = self.agent_txs.lock().unwrap();
551                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
552                };
553
554                let agent_id = agent_id.to_string();
555                tokio::spawn(async move {
556                    {
557                        let mut agent_guard = agent.lock().await;
558                        if let Err(e) = agent_guard.start().await {
559                            log::error!("Failed to start agent {}: {}", agent_id, e);
560                        }
561                    }
562
563                    while let Some(message) = rx.recv().await {
564                        match message {
565                            AgentMessage::Input { ctx, pin, value } => {
566                                agent
567                                    .lock()
568                                    .await
569                                    .process(ctx, pin, value)
570                                    .await
571                                    .unwrap_or_else(|e| {
572                                        log::error!("Process Error {}: {}", agent_id, e);
573                                    });
574                            }
575                            AgentMessage::Config { configs } => {
576                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
577                                    log::error!("Config Error {}: {}", agent_id, e);
578                                });
579                            }
580                            AgentMessage::Stop => {
581                                rx.close();
582                                return;
583                            }
584                        }
585                    }
586                });
587                tokio::task::yield_now().await;
588            }
589        }
590        Ok(())
591    }
592
593    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
594        let agent = {
595            let agents = self.agents.lock().unwrap();
596            let Some(a) = agents.get(agent_id) else {
597                return Err(AgentError::AgentNotFound(agent_id.to_string()));
598            };
599            a.clone()
600        };
601
602        let agent_status = {
603            let agent = agent.lock().await;
604            agent.status().clone()
605        };
606        if agent_status == AgentStatus::Start {
607            log::info!("Stopping agent {}", agent_id);
608
609            {
610                let mut agent_txs = self.agent_txs.lock().unwrap();
611                if let Some(tx) = agent_txs.swap_remove(agent_id) {
612                    match tx {
613                        AgentMessageSender::Sync(tx) => {
614                            tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
615                                log::error!(
616                                    "Failed to send stop message to agent {}: {}",
617                                    agent_id,
618                                    e
619                                );
620                            });
621                        }
622                        AgentMessageSender::Async(tx) => {
623                            tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
624                                log::error!(
625                                    "Failed to send stop message to agent {}: {}",
626                                    agent_id,
627                                    e
628                                );
629                            });
630                        }
631                    }
632                }
633            }
634
635            agent.lock().await.stop().await?;
636        }
637
638        Ok(())
639    }
640
641    pub async fn set_agent_configs(
642        &self,
643        agent_id: String,
644        configs: AgentConfigs,
645    ) -> Result<(), AgentError> {
646        let agent = {
647            let agents = self.agents.lock().unwrap();
648            let Some(a) = agents.get(&agent_id) else {
649                return Err(AgentError::AgentNotFound(agent_id.to_string()));
650            };
651            a.clone()
652        };
653
654        let agent_status = {
655            let agent = agent.lock().await;
656            agent.status().clone()
657        };
658        if agent_status == AgentStatus::Init {
659            agent.lock().await.set_configs(configs.clone())?;
660        } else if agent_status == AgentStatus::Start {
661            let tx = {
662                let agent_txs = self.agent_txs.lock().unwrap();
663                let Some(tx) = agent_txs.get(&agent_id) else {
664                    return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
665                };
666                tx.clone()
667            };
668            let message = AgentMessage::Config { configs };
669            match tx {
670                AgentMessageSender::Sync(tx) => {
671                    tx.send(message).map_err(|_| {
672                        AgentError::SendMessageFailed("Failed to send config message".to_string())
673                    })?;
674                }
675                AgentMessageSender::Async(tx) => {
676                    tx.send(message).await.map_err(|_| {
677                        AgentError::SendMessageFailed("Failed to send config message".to_string())
678                    })?;
679                }
680            }
681        }
682        Ok(())
683    }
684
685    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
686        let global_configs_map = self.global_configs_map.lock().unwrap();
687        global_configs_map.get(def_name).cloned()
688    }
689
690    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
691        let mut global_configs_map = self.global_configs_map.lock().unwrap();
692
693        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
694            global_configs_map.insert(def_name, configs);
695            return;
696        };
697
698        for (key, value) in configs {
699            existing_configs.set(key, value);
700        }
701    }
702
703    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
704        let global_configs_map = self.global_configs_map.lock().unwrap();
705        global_configs_map.clone()
706    }
707
708    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
709        for (agent_name, new_configs) in new_configs_map {
710            self.set_global_configs(agent_name, new_configs);
711        }
712    }
713
714    pub async fn agent_input(
715        &self,
716        agent_id: String,
717        ctx: AgentContext,
718        pin: String,
719        value: AgentValue,
720    ) -> Result<(), AgentError> {
721        let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
722            let agents = self.agents.lock().unwrap();
723            let Some(a) = agents.get(&agent_id) else {
724                return Err(AgentError::AgentNotFound(agent_id.to_string()));
725            };
726            a.clone()
727        };
728
729        let agent_status = {
730            let agent = agent.lock().await;
731            agent.status().clone()
732        };
733        if agent_status != AgentStatus::Start {
734            return Ok(());
735        }
736
737        if pin.starts_with("config:") {
738            let config_key = pin[7..].to_string();
739            let mut agent = agent.lock().await;
740            agent.set_config(config_key.clone(), value.clone())?;
741            return Ok(());
742        }
743
744        let message = AgentMessage::Input {
745            ctx,
746            pin: pin.clone(),
747            value,
748        };
749
750        let tx = {
751            let agent_txs = self.agent_txs.lock().unwrap();
752            let Some(tx) = agent_txs.get(&agent_id) else {
753                return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
754            };
755            tx.clone()
756        };
757        match tx {
758            AgentMessageSender::Sync(tx) => {
759                tx.send(message).map_err(|_| {
760                    AgentError::SendMessageFailed("Failed to send input message".to_string())
761                })?;
762            }
763            AgentMessageSender::Async(tx) => {
764                tx.send(message).await.map_err(|_| {
765                    AgentError::SendMessageFailed("Failed to send input message".to_string())
766                })?;
767            }
768        }
769
770        self.emit_agent_input(agent_id.to_string(), pin);
771
772        Ok(())
773    }
774
775    pub async fn send_agent_out(
776        &self,
777        agent_id: String,
778        ctx: AgentContext,
779        pin: String,
780        value: AgentValue,
781    ) -> Result<(), AgentError> {
782        message::send_agent_out(self, agent_id, ctx, pin, value).await
783    }
784
785    pub fn try_send_agent_out(
786        &self,
787        agent_id: String,
788        ctx: AgentContext,
789        pin: String,
790        value: AgentValue,
791    ) -> Result<(), AgentError> {
792        message::try_send_agent_out(self, agent_id, ctx, pin, value)
793    }
794
795    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
796        self.try_send_board_out(name, AgentContext::new(), value)
797    }
798
799    pub fn write_var_value(
800        &self,
801        stream_id: &str,
802        name: &str,
803        value: AgentValue,
804    ) -> Result<(), AgentError> {
805        let var_name = format!("%{}/{}", stream_id, name);
806        self.try_send_board_out(var_name, AgentContext::new(), value)
807    }
808
809    pub(crate) fn try_send_board_out(
810        &self,
811        name: String,
812        ctx: AgentContext,
813        value: AgentValue,
814    ) -> Result<(), AgentError> {
815        message::try_send_board_out(self, name, ctx, value)
816    }
817
818    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
819        // TODO: settings for the channel size
820        let (tx, mut rx) = mpsc::channel(4096);
821        {
822            let mut tx_lock = self.tx.lock().unwrap();
823            *tx_lock = Some(tx);
824        }
825
826        // spawn the main loop
827        let askit = self.clone();
828        tokio::spawn(async move {
829            while let Some(message) = rx.recv().await {
830                use AgentEventMessage::*;
831
832                match message {
833                    AgentOut {
834                        agent,
835                        ctx,
836                        pin,
837                        value,
838                    } => {
839                        message::agent_out(&askit, agent, ctx, pin, value).await;
840                    }
841                    BoardOut { name, ctx, value } => {
842                        message::board_out(&askit, name, ctx, value).await;
843                    }
844                }
845            }
846        });
847
848        tokio::task::yield_now().await;
849
850        Ok(())
851    }
852
853    async fn start_agent_streams_on_start(&self) -> Result<(), AgentError> {
854        let run_on_start_stream_ids;
855        {
856            let agent_streams = self.streams.lock().unwrap();
857            run_on_start_stream_ids = agent_streams
858                .values()
859                .filter(|s| s.spec().run_on_start)
860                .map(|s| s.id().to_string())
861                .collect::<Vec<_>>();
862        }
863
864        for id in run_on_start_stream_ids {
865            self.start_agent_stream(&id).await.unwrap_or_else(|e| {
866                log::error!("Failed to start agent stream: {}", e);
867            });
868        }
869        Ok(())
870    }
871
872    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
873        let mut observers = self.observers.lock().unwrap();
874        let observer_id = new_observer_id();
875        observers.insert(observer_id, observer);
876        observer_id
877    }
878
879    pub fn unsubscribe(&self, observer_id: usize) {
880        let mut observers = self.observers.lock().unwrap();
881        observers.swap_remove(&observer_id);
882    }
883
884    pub(crate) fn emit_agent_config_updated(
885        &self,
886        agent_id: String,
887        key: String,
888        value: AgentValue,
889    ) {
890        self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
891    }
892
893    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
894        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
895    }
896
897    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
898        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
899    }
900
901    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
902        self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
903    }
904
905    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
906        // // ignore variables
907        // if name.starts_with('%') {
908        //     return;
909        // }
910        self.notify_observers(ASKitEvent::Board(name, value));
911    }
912
913    fn notify_observers(&self, event: ASKitEvent) {
914        let observers = self.observers.lock().unwrap();
915        for (_id, observer) in observers.iter() {
916            observer.notify(&event);
917        }
918    }
919}
920
921fn is_valid_stream_name(new_name: &str) -> bool {
922    // Check if the name is empty
923    if new_name.trim().is_empty() {
924        return false;
925    }
926
927    // Checks for path-like names:
928    if new_name.contains('/') {
929        // Disallow leading, trailing, or consecutive slashes
930        if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
931            return false;
932        }
933        // Disallow segments that are "." or ".."
934        if new_name
935            .split('/')
936            .any(|segment| segment == "." || segment == "..")
937        {
938            return false;
939        }
940    }
941
942    // Check if the name contains invalid characters
943    let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
944    for c in invalid_chars {
945        if new_name.contains(c) {
946            return false;
947        }
948    }
949
950    true
951}
952
953#[derive(Clone, Debug)]
954pub enum ASKitEvent {
955    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
956    AgentError(String, String),                     // (agent_id, message)
957    AgentIn(String, String),                        // (agent_id, pin)
958    AgentSpecUpdated(String),                       // (agent_id)
959    Board(String, AgentValue),                      // (board name, value)
960}
961
962pub trait ASKitObserver {
963    fn notify(&self, event: &ASKitEvent);
964}
965
966static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
967
968fn new_observer_id() -> usize {
969    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
970}
971
972// Agent Message
973
974#[derive(Clone)]
975pub enum AgentMessageSender {
976    Sync(std::sync::mpsc::Sender<AgentMessage>),
977    Async(mpsc::Sender<AgentMessage>),
978}