agent_stream_kit/
askit.rs

1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use im::Vector;
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::FnvIndexMap;
8use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::id::{new_id, update_ids};
14use crate::message::{self, AgentEventMessage};
15use crate::registry;
16use crate::spec::{AgentSpec, AgentStreamSpec, ChannelSpec};
17use crate::stream::{AgentStream, AgentStreamInfo, AgentStreams};
18use crate::value::AgentValue;
19
20const MESSAGE_LIMIT: usize = 1024;
21
22#[derive(Clone)]
23pub struct ASKit {
24    // agent id -> agent
25    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
26
27    // agent id -> sender
28    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
29
30    // board name -> [board out agent id]
31    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
32
33    // board name -> value
34    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
35
36    // source agent id -> [target agent id / source handle / target handle]
37    pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
38
39    // agent def name -> agent definition
40    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
41
42    // agent streams (stream id -> stream)
43    pub(crate) streams: Arc<Mutex<AgentStreams>>,
44
45    // agent def name -> config
46    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
47
48    // message sender
49    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
50
51    // observers
52    pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
53}
54
55impl ASKit {
56    pub fn new() -> Self {
57        Self {
58            agents: Default::default(),
59            agent_txs: Default::default(),
60            board_out_agents: Default::default(),
61            board_value: Default::default(),
62            channels: Default::default(),
63            defs: Default::default(),
64            streams: Default::default(),
65            global_configs_map: Default::default(),
66            tx: Arc::new(Mutex::new(None)),
67            observers: Default::default(),
68        }
69    }
70
71    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
72        self.tx
73            .lock()
74            .unwrap()
75            .clone()
76            .ok_or(AgentError::TxNotInitialized)
77    }
78
79    /// Initialize ASKit.
80    pub fn init() -> Result<Self, AgentError> {
81        let askit = Self::new();
82        askit.register_agents();
83        Ok(askit)
84    }
85
86    fn register_agents(&self) {
87        registry::register_inventory_agents(self);
88    }
89
90    /// Prepare ASKit to be ready.
91    pub async fn ready(&self) -> Result<(), AgentError> {
92        self.spawn_message_loop().await?;
93        self.start_agent_streams_on_start().await?;
94        Ok(())
95    }
96
97    /// Quit ASKit.
98    pub fn quit(&self) {
99        let mut tx_lock = self.tx.lock().unwrap();
100        *tx_lock = None;
101    }
102
103    /// Register an agent definition.
104    pub fn register_agent_definiton(&self, def: AgentDefinition) {
105        let def_name = def.name.clone();
106        let def_global_configs = def.global_configs.clone();
107
108        let mut defs = self.defs.lock().unwrap();
109        defs.insert(def.name.clone(), def);
110
111        // if there is a global config, set it
112        if let Some(def_global_configs) = def_global_configs {
113            let mut new_configs = AgentConfigs::default();
114            for (key, config_entry) in def_global_configs.iter() {
115                new_configs.set(key.clone(), config_entry.value.clone());
116            }
117            self.set_global_configs(def_name, new_configs);
118        }
119    }
120
121    /// Get all agent definitions.
122    pub fn get_agent_definitions(&self) -> AgentDefinitions {
123        let defs = self.defs.lock().unwrap();
124        defs.clone()
125    }
126
127    /// Get an agent definition by name.
128    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
129        let defs = self.defs.lock().unwrap();
130        defs.get(def_name).cloned()
131    }
132
133    /// Get the config specs of an agent definition by name.
134    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
135        let defs = self.defs.lock().unwrap();
136        let Some(def) = defs.get(def_name) else {
137            return None;
138        };
139        def.configs.clone()
140    }
141
142    /// Get the agent spec by id.
143    pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
144        let agents = self.agents.lock().unwrap();
145        let Some(agent) = agents.get(agent_id) else {
146            return None;
147        };
148        let agent = agent.blocking_lock();
149        Some(agent.spec().clone())
150    }
151
152    // streams
153
154    /// Get info of the agent stream by id.
155    pub fn get_agent_stream_info(&self, id: &str) -> Option<AgentStreamInfo> {
156        let streams = self.streams.lock().unwrap();
157        streams.get(id).map(|stream| stream.into())
158    }
159
160    /// Get infos of all agent streams.
161    pub fn get_agent_stream_infos(&self) -> Vec<AgentStreamInfo> {
162        let streams = self.streams.lock().unwrap();
163        streams.values().map(|s| s.into()).collect()
164    }
165
166    /// Get the agent stream spec by id.
167    pub fn get_agent_stream_spec(&self, id: &str) -> Option<AgentStreamSpec> {
168        let streams = self.streams.lock().unwrap();
169        streams.get(id).map(|stream| stream.spec().clone())
170    }
171
172    /// Set the agent stream spec by id.
173    pub fn set_agent_stream_spec(&self, id: &str, spec: AgentStreamSpec) -> Result<(), AgentError> {
174        let mut streams = self.streams.lock().unwrap();
175        let Some(stream) = streams.get_mut(id) else {
176            return Err(AgentError::StreamNotFound(id.to_string()));
177        };
178        *stream.spec_mut() = spec;
179        Ok(())
180    }
181
182    /// Create a new agent stream with the given name.
183    /// If the name already exists, a unique name will be generated by appending a number suffix.
184    /// Returns the id of the new agent stream.
185    pub fn new_agent_stream(&self, name: &str) -> Result<String, AgentError> {
186        if !is_valid_stream_name(name) {
187            return Err(AgentError::InvalidStreamName(name.into()));
188        }
189        let new_name = self.unique_stream_name(name);
190        let spec = AgentStreamSpec::default();
191        let id = self.add_agent_stream(new_name, spec)?;
192        Ok(id)
193    }
194
195    /// Rename an existing agent stream.
196    pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
197        if !is_valid_stream_name(new_name) {
198            return Err(AgentError::InvalidStreamName(new_name.into()));
199        }
200
201        // check if the new name is already used
202        let new_name = self.unique_stream_name(new_name);
203
204        let mut streams = self.streams.lock().unwrap();
205
206        // remove the original stream
207        let Some(mut stream) = streams.swap_remove(id) else {
208            return Err(AgentError::RenameStreamFailed(id.into()));
209        };
210
211        // insert renamed stream
212        stream.set_name(new_name.clone());
213        streams.insert(stream.id().to_string(), stream);
214        Ok(new_name)
215    }
216
217    /// Generate a unique stream name by appending a number suffix if needed.
218    pub fn unique_stream_name(&self, name: &str) -> String {
219        let mut new_name = name.trim().to_string();
220        let mut i = 2;
221        let streams = self.streams.lock().unwrap();
222        while streams.values().any(|stream| stream.name() == new_name) {
223            new_name = format!("{}{}", name, i);
224            i += 1;
225        }
226        new_name
227    }
228
229    /// Add a new agent stream with the given name and spec, and returns the id of the new agent stream.
230    ///
231    /// The ids of the given spec, including agents and channels, are changed to new unique ids.
232    pub fn add_agent_stream(
233        &self,
234        name: String,
235        spec: AgentStreamSpec,
236    ) -> Result<String, AgentError> {
237        let stream = AgentStream::new(name, spec);
238        let id = stream.id().to_string();
239
240        // add agents
241        for agent in &stream.spec().agents {
242            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
243                log::error!("Failed to add_agent {}: {}", agent.id, e);
244            }
245        }
246
247        // add channels
248        for channel in &stream.spec().channels {
249            self.add_channel_internal(channel.clone())
250                .unwrap_or_else(|e| {
251                    log::error!("Failed to add_channel {}: {}", channel.source, e);
252                });
253        }
254
255        // add the given stream into streams
256        let mut streams = self.streams.lock().unwrap();
257        if streams.contains_key(&id) {
258            return Err(AgentError::DuplicateId(id.into()));
259        }
260        streams.insert(id.to_string(), stream);
261
262        Ok(id)
263    }
264
265    /// Remove an agent stream by id.
266    pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
267        let mut stream = {
268            let mut streams = self.streams.lock().unwrap();
269            let Some(stream) = streams.swap_remove(id) else {
270                return Err(AgentError::StreamNotFound(id.to_string()));
271            };
272            stream
273        };
274
275        stream.stop(self).await.unwrap_or_else(|e| {
276            log::error!("Failed to stop stream {}: {}", id, e);
277        });
278
279        // Remove all agents and channels associated with the stream
280        for agent in &stream.spec().agents {
281            self.remove_agent_internal(&agent.id)
282                .await
283                .unwrap_or_else(|e| {
284                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
285                });
286        }
287        for channel in &stream.spec().channels {
288            self.remove_channel_internal(channel);
289        }
290
291        Ok(())
292    }
293
294    /// Start an agent stream by id.
295    pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
296        let mut stream = {
297            let mut streams = self.streams.lock().unwrap();
298            let Some(stream) = streams.swap_remove(id) else {
299                return Err(AgentError::StreamNotFound(id.to_string()));
300            };
301            stream
302        };
303
304        stream.start(self).await?;
305
306        let mut streams = self.streams.lock().unwrap();
307        streams.insert(id.to_string(), stream);
308        Ok(())
309    }
310
311    /// Stop an agent stream by id.
312    pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
313        let mut stream = {
314            let mut streams = self.streams.lock().unwrap();
315            let Some(stream) = streams.swap_remove(id) else {
316                return Err(AgentError::StreamNotFound(id.to_string()));
317            };
318            stream
319        };
320
321        stream.stop(self).await?;
322
323        let mut streams = self.streams.lock().unwrap();
324        streams.insert(id.to_string(), stream);
325        Ok(())
326    }
327
328    // Agents
329
330    /// Create a new agent spec from the given agent definition name.
331    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
332        let def = self
333            .get_agent_definition(def_name)
334            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
335        Ok(def.to_spec())
336    }
337
338    /// Add an agent to the specified stream, and returns the id of the newly added agent.
339    pub fn add_agent(&self, stream_id: String, mut spec: AgentSpec) -> Result<String, AgentError> {
340        let mut streams = self.streams.lock().unwrap();
341        let Some(stream) = streams.get_mut(&stream_id) else {
342            return Err(AgentError::StreamNotFound(stream_id.to_string()));
343        };
344        let id = new_id();
345        spec.id = id.clone();
346        self.add_agent_internal(stream_id, spec.clone())?;
347        stream.spec_mut().add_agent(spec.clone());
348        Ok(id)
349    }
350
351    fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
352        let mut agents = self.agents.lock().unwrap();
353        if agents.contains_key(&spec.id) {
354            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
355        }
356        let spec_id = spec.id.clone();
357        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
358        agent.set_stream_id(stream_id);
359        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
360        Ok(())
361    }
362
363    /// Get the agent by id.
364    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
365        let agents = self.agents.lock().unwrap();
366        agents.get(agent_id).cloned()
367    }
368
369    /// Add a channel to the specified stream.
370    pub fn add_channel(&self, stream_id: &str, channel: ChannelSpec) -> Result<(), AgentError> {
371        // check if the source and target agents exist
372        {
373            let agents = self.agents.lock().unwrap();
374            if !agents.contains_key(&channel.source) {
375                return Err(AgentError::AgentNotFound(channel.source.to_string()));
376            }
377            if !agents.contains_key(&channel.target) {
378                return Err(AgentError::AgentNotFound(channel.target.to_string()));
379            }
380        }
381
382        // check if handles are valid
383        if channel.source_handle.is_empty() {
384            return Err(AgentError::EmptySourceHandle);
385        }
386        if channel.target_handle.is_empty() {
387            return Err(AgentError::EmptyTargetHandle);
388        }
389
390        let mut streams = self.streams.lock().unwrap();
391        let Some(stream) = streams.get_mut(stream_id) else {
392            return Err(AgentError::StreamNotFound(stream_id.to_string()));
393        };
394        stream.spec_mut().add_channels(channel.clone());
395        self.add_channel_internal(channel)?;
396        Ok(())
397    }
398
399    fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
400        let mut channels = self.channels.lock().unwrap();
401        if let Some(targets) = channels.get_mut(&channel.source) {
402            if targets
403                .iter()
404                .any(|(target, source_handle, target_handle)| {
405                    *target == channel.target
406                        && *source_handle == channel.source_handle
407                        && *target_handle == channel.target_handle
408                })
409            {
410                return Err(AgentError::ChannelAlreadyExists);
411            }
412            targets.push((channel.target, channel.source_handle, channel.target_handle));
413        } else {
414            channels.insert(
415                channel.source,
416                vec![(channel.target, channel.source_handle, channel.target_handle)],
417            );
418        }
419        Ok(())
420    }
421
422    /// Add agents and channels to the specified stream.
423    ///
424    /// The ids of the given agents and channels are changed to new unique ids.
425    pub fn add_agents_and_channels(
426        &self,
427        stream_id: &str,
428        agents: &Vector<AgentSpec>,
429        channels: &Vector<ChannelSpec>,
430    ) -> Result<(Vector<AgentSpec>, Vector<ChannelSpec>), AgentError> {
431        let (agents, channels) = update_ids(agents, channels);
432
433        let mut streams = self.streams.lock().unwrap();
434        let Some(stream) = streams.get_mut(stream_id) else {
435            return Err(AgentError::StreamNotFound(stream_id.to_string()));
436        };
437
438        for agent in &agents {
439            self.add_agent_internal(stream_id.to_string(), agent.clone())?;
440            stream.spec_mut().add_agent(agent.clone());
441        }
442
443        for channel in &channels {
444            self.add_channel_internal(channel.clone())?;
445            stream.spec_mut().add_channels(channel.clone());
446        }
447
448        Ok((agents, channels))
449    }
450
451    /// Remove an agent from the specified stream.
452    pub async fn remove_agent(&self, stream_id: &str, agent_id: &str) -> Result<(), AgentError> {
453        {
454            let mut streams = self.streams.lock().unwrap();
455            let Some(stream) = streams.get_mut(stream_id) else {
456                return Err(AgentError::StreamNotFound(stream_id.to_string()));
457            };
458            stream.spec_mut().remove_agent(agent_id);
459        }
460        if let Err(e) = self.remove_agent_internal(agent_id).await {
461            return Err(e);
462        }
463        Ok(())
464    }
465
466    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
467        self.stop_agent(agent_id).await?;
468
469        // remove from channels
470        {
471            let mut channels = self.channels.lock().unwrap();
472            let mut sources_to_remove = Vec::new();
473            for (source, targets) in channels.iter_mut() {
474                targets.retain(|(target, _, _)| target != agent_id);
475                if targets.is_empty() {
476                    sources_to_remove.push(source.clone());
477                }
478            }
479            for source in sources_to_remove {
480                channels.swap_remove(&source);
481            }
482            channels.swap_remove(agent_id);
483        }
484
485        // remove from agents
486        {
487            let mut agents = self.agents.lock().unwrap();
488            agents.swap_remove(agent_id);
489        }
490
491        Ok(())
492    }
493
494    /// Remove a channel from the specified stream.
495    pub fn remove_channel(&self, stream_id: &str, channel: &ChannelSpec) -> Result<(), AgentError> {
496        let mut stream = {
497            let mut streams = self.streams.lock().unwrap();
498            let Some(stream) = streams.swap_remove(stream_id) else {
499                return Err(AgentError::StreamNotFound(stream_id.to_string()));
500            };
501            stream
502        };
503
504        let Some(channel) = stream.spec_mut().remove_channel(channel) else {
505            let mut streams = self.streams.lock().unwrap();
506            streams.insert(stream_id.to_string(), stream);
507            return Err(AgentError::ChannelNotFound(format!(
508                "{}:{}->{}:{}",
509                channel.source, channel.source_handle, channel.target, channel.target_handle
510            )));
511        };
512        let mut streams = self.streams.lock().unwrap();
513        streams.insert(stream_id.to_string(), stream);
514
515        self.remove_channel_internal(&channel);
516        Ok(())
517    }
518
519    fn remove_channel_internal(&self, channel: &ChannelSpec) {
520        let mut channels = self.channels.lock().unwrap();
521        if let Some(targets) = channels.get_mut(&channel.source) {
522            targets.retain(|(target, source_handle, target_handle)| {
523                *target != channel.target
524                    || *source_handle != channel.source_handle
525                    || *target_handle != channel.target_handle
526            });
527            if targets.is_empty() {
528                channels.swap_remove(&channel.source);
529            }
530        }
531    }
532
533    /// Start an agent by id.
534    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
535        let agent = {
536            let agents = self.agents.lock().unwrap();
537            let Some(a) = agents.get(agent_id) else {
538                return Err(AgentError::AgentNotFound(agent_id.to_string()));
539            };
540            a.clone()
541        };
542        let def_name = {
543            let agent = agent.lock().await;
544            agent.def_name().to_string()
545        };
546        let uses_native_thread = {
547            let defs = self.defs.lock().unwrap();
548            let Some(def) = defs.get(&def_name) else {
549                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
550            };
551            def.native_thread
552        };
553        let agent_status = {
554            // This will not block since the agent is not started yet.
555            let agent = agent.lock().await;
556            agent.status().clone()
557        };
558        if agent_status == AgentStatus::Init {
559            log::info!("Starting agent {}", agent_id);
560
561            if uses_native_thread {
562                let (tx, rx) = std::sync::mpsc::channel();
563
564                {
565                    let mut agent_txs = self.agent_txs.lock().unwrap();
566                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
567                };
568
569                let agent_id = agent_id.to_string();
570                std::thread::spawn(async move || {
571                    if let Err(e) = agent.lock().await.start().await {
572                        log::error!("Failed to start agent {}: {}", agent_id, e);
573                    }
574
575                    while let Ok(message) = rx.recv() {
576                        match message {
577                            AgentMessage::Input { ctx, pin, value } => {
578                                agent
579                                    .lock()
580                                    .await
581                                    .process(ctx, pin, value)
582                                    .await
583                                    .unwrap_or_else(|e| {
584                                        log::error!("Process Error {}: {}", agent_id, e);
585                                    });
586                            }
587                            AgentMessage::Config { key, value } => {
588                                agent
589                                    .lock()
590                                    .await
591                                    .set_config(key, value)
592                                    .unwrap_or_else(|e| {
593                                        log::error!("Config Error {}: {}", agent_id, e);
594                                    });
595                            }
596                            AgentMessage::Configs { configs } => {
597                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
598                                    log::error!("Configs Error {}: {}", agent_id, e);
599                                });
600                            }
601                            AgentMessage::Stop => {
602                                break;
603                            }
604                        }
605                    }
606                });
607            } else {
608                let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
609
610                {
611                    let mut agent_txs = self.agent_txs.lock().unwrap();
612                    agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
613                };
614
615                let agent_id = agent_id.to_string();
616                tokio::spawn(async move {
617                    {
618                        let mut agent_guard = agent.lock().await;
619                        if let Err(e) = agent_guard.start().await {
620                            log::error!("Failed to start agent {}: {}", agent_id, e);
621                        }
622                    }
623
624                    while let Some(message) = rx.recv().await {
625                        match message {
626                            AgentMessage::Input { ctx, pin, value } => {
627                                agent
628                                    .lock()
629                                    .await
630                                    .process(ctx, pin, value)
631                                    .await
632                                    .unwrap_or_else(|e| {
633                                        log::error!("Process Error {}: {}", agent_id, e);
634                                    });
635                            }
636                            AgentMessage::Config { key, value } => {
637                                agent
638                                    .lock()
639                                    .await
640                                    .set_config(key, value)
641                                    .unwrap_or_else(|e| {
642                                        log::error!("Config Error {}: {}", agent_id, e);
643                                    });
644                            }
645                            AgentMessage::Configs { configs } => {
646                                agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
647                                    log::error!("Configs Error {}: {}", agent_id, e);
648                                });
649                            }
650                            AgentMessage::Stop => {
651                                rx.close();
652                                return;
653                            }
654                        }
655                    }
656                });
657                tokio::task::yield_now().await;
658            }
659        }
660        Ok(())
661    }
662
663    /// Stop an agent by id.
664    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
665        {
666            // remove the sender first to prevent new messages being sent
667            let mut agent_txs = self.agent_txs.lock().unwrap();
668            if let Some(tx) = agent_txs.swap_remove(agent_id) {
669                match tx {
670                    AgentMessageSender::Sync(tx) => {
671                        tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
672                            log::error!("Failed to send stop message to agent {}: {}", agent_id, e);
673                        });
674                    }
675                    AgentMessageSender::Async(tx) => {
676                        tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
677                            log::error!("Failed to send stop message to agent {}: {}", agent_id, e);
678                        });
679                    }
680                }
681            }
682        }
683
684        let agent = {
685            let agents = self.agents.lock().unwrap();
686            let Some(a) = agents.get(agent_id) else {
687                return Err(AgentError::AgentNotFound(agent_id.to_string()));
688            };
689            a.clone()
690        };
691        let mut agent_guard = agent.lock().await;
692        if *agent_guard.status() == AgentStatus::Start {
693            log::info!("Stopping agent {}", agent_id);
694            agent_guard.stop().await?;
695        }
696
697        Ok(())
698    }
699
700    /// Set configs for an agent by id.
701    pub async fn set_agent_configs(
702        &self,
703        agent_id: String,
704        configs: AgentConfigs,
705    ) -> Result<(), AgentError> {
706        let tx = {
707            let agent_txs = self.agent_txs.lock().unwrap();
708            agent_txs.get(&agent_id).cloned()
709        };
710
711        let Some(tx) = tx else {
712            // The agent is not running. We can set the configs directly.
713            let agent = {
714                let agents = self.agents.lock().unwrap();
715                let Some(a) = agents.get(&agent_id) else {
716                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
717                };
718                a.clone()
719            };
720            agent.lock().await.set_configs(configs.clone())?;
721            return Ok(());
722        };
723        let message = AgentMessage::Configs { configs };
724        match tx {
725            AgentMessageSender::Sync(tx) => {
726                tx.send(message).map_err(|_| {
727                    AgentError::SendMessageFailed("Failed to send config message".to_string())
728                })?;
729            }
730            AgentMessageSender::Async(tx) => {
731                tx.send(message).await.map_err(|_| {
732                    AgentError::SendMessageFailed("Failed to send config message".to_string())
733                })?;
734            }
735        }
736        Ok(())
737    }
738
739    /// Get global configs for the agent definition by name.
740    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
741        let global_configs_map = self.global_configs_map.lock().unwrap();
742        global_configs_map.get(def_name).cloned()
743    }
744
745    /// Set global configs for the agent definition by name.
746    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
747        let mut global_configs_map = self.global_configs_map.lock().unwrap();
748
749        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
750            global_configs_map.insert(def_name, configs);
751            return;
752        };
753
754        for (key, value) in configs {
755            existing_configs.set(key, value);
756        }
757    }
758
759    /// Get the global configs map.
760    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
761        let global_configs_map = self.global_configs_map.lock().unwrap();
762        global_configs_map.clone()
763    }
764
765    /// Set the global configs map.
766    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
767        for (agent_name, new_configs) in new_configs_map {
768            self.set_global_configs(agent_name, new_configs);
769        }
770    }
771
772    /// Send input to an agent.
773    pub(crate) async fn agent_input(
774        &self,
775        agent_id: String,
776        ctx: AgentContext,
777        pin: String,
778        value: AgentValue,
779    ) -> Result<(), AgentError> {
780        let message = if pin.starts_with("config:") {
781            let config_key = pin[7..].to_string();
782            AgentMessage::Config {
783                key: config_key,
784                value,
785            }
786        } else {
787            AgentMessage::Input {
788                ctx,
789                pin: pin.clone(),
790                value,
791            }
792        };
793
794        let tx = {
795            let agent_txs = self.agent_txs.lock().unwrap();
796            agent_txs.get(&agent_id).cloned()
797        };
798
799        let Some(tx) = tx else {
800            // The agent is not running. If it's a config message, we can set it directly.
801            let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
802                let agents = self.agents.lock().unwrap();
803                let Some(a) = agents.get(&agent_id) else {
804                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
805                };
806                a.clone()
807            };
808            if let AgentMessage::Config { key, value } = message {
809                agent.lock().await.set_config(key, value)?;
810            }
811            return Ok(());
812        };
813        match tx {
814            AgentMessageSender::Sync(tx) => {
815                tx.send(message).map_err(|_| {
816                    AgentError::SendMessageFailed("Failed to send input message".to_string())
817                })?;
818            }
819            AgentMessageSender::Async(tx) => {
820                tx.send(message).await.map_err(|_| {
821                    AgentError::SendMessageFailed("Failed to send input message".to_string())
822                })?;
823            }
824        }
825
826        self.emit_agent_input(agent_id.to_string(), pin);
827
828        Ok(())
829    }
830
831    /// Send output from an agent. (Async version)
832    pub async fn send_agent_out(
833        &self,
834        agent_id: String,
835        ctx: AgentContext,
836        pin: String,
837        value: AgentValue,
838    ) -> Result<(), AgentError> {
839        message::send_agent_out(self, agent_id, ctx, pin, value).await
840    }
841
842    /// Send output from an agent.
843    pub fn try_send_agent_out(
844        &self,
845        agent_id: String,
846        ctx: AgentContext,
847        pin: String,
848        value: AgentValue,
849    ) -> Result<(), AgentError> {
850        message::try_send_agent_out(self, agent_id, ctx, pin, value)
851    }
852
853    /// Write a value to the board.
854    pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
855        self.try_send_board_out(name, AgentContext::new(), value)
856    }
857
858    /// Write a value to the variable board.
859    pub fn write_var_value(
860        &self,
861        stream_id: &str,
862        name: &str,
863        value: AgentValue,
864    ) -> Result<(), AgentError> {
865        let var_name = format!("%{}/{}", stream_id, name);
866        self.try_send_board_out(var_name, AgentContext::new(), value)
867    }
868
869    pub(crate) fn try_send_board_out(
870        &self,
871        name: String,
872        ctx: AgentContext,
873        value: AgentValue,
874    ) -> Result<(), AgentError> {
875        message::try_send_board_out(self, name, ctx, value)
876    }
877
878    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
879        // TODO: settings for the channel size
880        let (tx, mut rx) = mpsc::channel(4096);
881        {
882            let mut tx_lock = self.tx.lock().unwrap();
883            *tx_lock = Some(tx);
884        }
885
886        // spawn the main loop
887        let askit = self.clone();
888        tokio::spawn(async move {
889            while let Some(message) = rx.recv().await {
890                use AgentEventMessage::*;
891
892                match message {
893                    AgentOut {
894                        agent,
895                        ctx,
896                        pin,
897                        value,
898                    } => {
899                        message::agent_out(&askit, agent, ctx, pin, value).await;
900                    }
901                    BoardOut { name, ctx, value } => {
902                        message::board_out(&askit, name, ctx, value).await;
903                    }
904                }
905            }
906        });
907
908        tokio::task::yield_now().await;
909
910        Ok(())
911    }
912
913    async fn start_agent_streams_on_start(&self) -> Result<(), AgentError> {
914        let run_on_start_stream_ids;
915        {
916            let agent_streams = self.streams.lock().unwrap();
917            run_on_start_stream_ids = agent_streams
918                .values()
919                .filter(|s| s.spec().run_on_start)
920                .map(|s| s.id().to_string())
921                .collect::<Vec<_>>();
922        }
923
924        for id in run_on_start_stream_ids {
925            self.start_agent_stream(&id).await.unwrap_or_else(|e| {
926                log::error!("Failed to start agent stream: {}", e);
927            });
928        }
929        Ok(())
930    }
931
932    /// Subscribe an observer to ASKit events.
933    pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
934        let mut observers = self.observers.lock().unwrap();
935        let observer_id = new_observer_id();
936        observers.insert(observer_id, observer);
937        observer_id
938    }
939
940    /// Unsubscribe an observer from ASKit events.
941    pub fn unsubscribe(&self, observer_id: usize) {
942        let mut observers = self.observers.lock().unwrap();
943        observers.swap_remove(&observer_id);
944    }
945
946    pub(crate) fn emit_agent_config_updated(
947        &self,
948        agent_id: String,
949        key: String,
950        value: AgentValue,
951    ) {
952        self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
953    }
954
955    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
956        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
957    }
958
959    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
960        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
961    }
962
963    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
964        self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
965    }
966
967    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
968        // // ignore variables
969        // if name.starts_with('%') {
970        //     return;
971        // }
972        self.notify_observers(ASKitEvent::Board(name, value));
973    }
974
975    fn notify_observers(&self, event: ASKitEvent) {
976        let observers = self.observers.lock().unwrap();
977        for (_id, observer) in observers.iter() {
978            observer.notify(&event);
979        }
980    }
981}
982
983fn is_valid_stream_name(new_name: &str) -> bool {
984    // Check if the name is empty
985    if new_name.trim().is_empty() {
986        return false;
987    }
988
989    // Checks for path-like names:
990    if new_name.contains('/') {
991        // Disallow leading, trailing, or consecutive slashes
992        if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
993            return false;
994        }
995        // Disallow segments that are "." or ".."
996        if new_name
997            .split('/')
998            .any(|segment| segment == "." || segment == "..")
999        {
1000            return false;
1001        }
1002    }
1003
1004    // Check if the name contains invalid characters
1005    let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
1006    for c in invalid_chars {
1007        if new_name.contains(c) {
1008            return false;
1009        }
1010    }
1011
1012    true
1013}
1014
1015#[derive(Clone, Debug)]
1016pub enum ASKitEvent {
1017    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
1018    AgentError(String, String),                     // (agent_id, message)
1019    AgentIn(String, String),                        // (agent_id, pin)
1020    AgentSpecUpdated(String),                       // (agent_id)
1021    Board(String, AgentValue),                      // (board name, value)
1022}
1023
1024pub trait ASKitObserver {
1025    fn notify(&self, event: &ASKitEvent);
1026}
1027
1028static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
1029
1030fn new_observer_id() -> usize {
1031    OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1032}
1033
1034// Agent Message
1035
1036#[derive(Clone)]
1037pub enum AgentMessageSender {
1038    Sync(std::sync::mpsc::Sender<AgentMessage>),
1039    Async(mpsc::Sender<AgentMessage>),
1040}