agent_stream_kit/
askit.rs

1use std::sync::{Arc, Mutex};
2
3use serde_json::Value;
4use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::id::{new_id, update_ids};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::spec::{AgentSpec, AgentStreamSpec, ChannelSpec};
16use crate::stream::{AgentStream, AgentStreamInfo, AgentStreams};
17use crate::value::AgentValue;
18
19const MESSAGE_LIMIT: usize = 1024;
20const EVENT_CHANNEL_CAPACITY: usize = 256;
21
22#[derive(Clone)]
23pub struct ASKit {
24    // agent id -> agent
25    pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
26
27    // agent id -> sender
28    pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
29
30    // board name -> [board out agent id]
31    pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
32
33    // board name -> value
34    pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
35
36    // source agent id -> [target agent id / source handle / target handle]
37    pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
38
39    // agent def name -> agent definition
40    pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
41
42    // agent streams (stream id -> stream)
43    pub(crate) streams: Arc<Mutex<AgentStreams>>,
44
45    // agent def name -> config
46    pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
47
48    // message sender
49    pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
50
51    // observers
52    pub(crate) observers: broadcast::Sender<ASKitEvent>,
53}
54
55impl ASKit {
56    pub fn new() -> Self {
57        let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
58        Self {
59            agents: Default::default(),
60            agent_txs: Default::default(),
61            board_out_agents: Default::default(),
62            board_value: Default::default(),
63            channels: Default::default(),
64            defs: Default::default(),
65            streams: Default::default(),
66            global_configs_map: Default::default(),
67            tx: Arc::new(Mutex::new(None)),
68            observers: tx,
69        }
70    }
71
72    pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
73        self.tx
74            .lock()
75            .unwrap()
76            .clone()
77            .ok_or(AgentError::TxNotInitialized)
78    }
79
80    /// Initialize ASKit.
81    pub fn init() -> Result<Self, AgentError> {
82        let askit = Self::new();
83        askit.register_agents();
84        Ok(askit)
85    }
86
87    fn register_agents(&self) {
88        registry::register_inventory_agents(self);
89    }
90
91    /// Prepare ASKit to be ready.
92    pub async fn ready(&self) -> Result<(), AgentError> {
93        self.spawn_message_loop().await?;
94        Ok(())
95    }
96
97    /// Quit ASKit.
98    pub fn quit(&self) {
99        let mut tx_lock = self.tx.lock().unwrap();
100        *tx_lock = None;
101    }
102
103    /// Register an agent definition.
104    pub fn register_agent_definiton(&self, def: AgentDefinition) {
105        let def_name = def.name.clone();
106        let def_global_configs = def.global_configs.clone();
107
108        let mut defs = self.defs.lock().unwrap();
109        defs.insert(def.name.clone(), def);
110
111        // if there is a global config, set it
112        if let Some(def_global_configs) = def_global_configs {
113            let mut new_configs = AgentConfigs::default();
114            for (key, config_entry) in def_global_configs.iter() {
115                new_configs.set(key.clone(), config_entry.value.clone());
116            }
117            self.set_global_configs(def_name, new_configs);
118        }
119    }
120
121    /// Get all agent definitions.
122    pub fn get_agent_definitions(&self) -> AgentDefinitions {
123        let defs = self.defs.lock().unwrap();
124        defs.clone()
125    }
126
127    /// Get an agent definition by name.
128    pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
129        let defs = self.defs.lock().unwrap();
130        defs.get(def_name).cloned()
131    }
132
133    /// Get the config specs of an agent definition by name.
134    pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
135        let defs = self.defs.lock().unwrap();
136        let Some(def) = defs.get(def_name) else {
137            return None;
138        };
139        def.configs.clone()
140    }
141
142    /// Get the agent spec by id.
143    pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
144        let agent = {
145            let agents = self.agents.lock().unwrap();
146            let Some(agent) = agents.get(agent_id) else {
147                return None;
148            };
149            agent.clone()
150        };
151        let agent = agent.lock().await;
152        Some(agent.spec().clone())
153    }
154
155    /// Update the agent spec by id.
156    pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
157        let agent = {
158            let agents = self.agents.lock().unwrap();
159            let Some(agent) = agents.get(agent_id) else {
160                return Err(AgentError::AgentNotFound(agent_id.to_string()));
161            };
162            agent.clone()
163        };
164        let mut agent = agent.lock().await;
165        agent.update_spec(value)?;
166        Ok(())
167    }
168
169    // streams
170
171    /// Get info of the agent stream by id.
172    pub fn get_agent_stream_info(&self, id: &str) -> Option<AgentStreamInfo> {
173        let streams = self.streams.lock().unwrap();
174        streams.get(id).map(|stream| stream.into())
175    }
176
177    /// Get infos of all agent streams.
178    pub fn get_agent_stream_infos(&self) -> Vec<AgentStreamInfo> {
179        let streams = self.streams.lock().unwrap();
180        streams.values().map(|s| s.into()).collect()
181    }
182
183    /// Get the agent stream spec by id.
184    pub async fn get_agent_stream_spec(&self, id: &str) -> Option<AgentStreamSpec> {
185        let stream_spec = {
186            let streams = self.streams.lock().unwrap();
187            streams.get(id).map(|stream| stream.spec().clone())
188        };
189        let Some(mut stream_spec) = stream_spec else {
190            return None;
191        };
192
193        // collect agent specs in the stream
194        let mut agent_specs = Vec::new();
195        for agent in &stream_spec.agents {
196            if let Some(spec) = self.get_agent_spec(&agent.id).await {
197                agent_specs.push(spec);
198            }
199        }
200        stream_spec.agents = agent_specs;
201
202        // No need to change channels
203
204        Some(stream_spec)
205    }
206
207    /// Update the agent stream spec
208    pub fn update_agent_stream_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
209        let mut streams = self.streams.lock().unwrap();
210        let Some(stream) = streams.get_mut(id) else {
211            return Err(AgentError::StreamNotFound(id.to_string()));
212        };
213        stream.update_spec(value)?;
214        Ok(())
215    }
216
217    /// Create a new agent stream with the given name.
218    /// If the name already exists, a unique name will be generated by appending a number suffix.
219    /// Returns the id of the new agent stream.
220    pub fn new_agent_stream(&self, name: &str) -> Result<String, AgentError> {
221        if !is_valid_stream_name(name) {
222            return Err(AgentError::InvalidStreamName(name.into()));
223        }
224        let new_name = self.unique_stream_name(name);
225        let spec = AgentStreamSpec::default();
226        let id = self.add_agent_stream(new_name, spec)?;
227        Ok(id)
228    }
229
230    /// Rename an existing agent stream.
231    pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
232        if !is_valid_stream_name(new_name) {
233            return Err(AgentError::InvalidStreamName(new_name.into()));
234        }
235
236        // check if the new name is already used
237        let new_name = self.unique_stream_name(new_name);
238
239        let mut streams = self.streams.lock().unwrap();
240
241        // remove the original stream
242        let Some(mut stream) = streams.swap_remove(id) else {
243            return Err(AgentError::RenameStreamFailed(id.into()));
244        };
245
246        // insert renamed stream
247        stream.set_name(new_name.clone());
248        streams.insert(stream.id().to_string(), stream);
249        Ok(new_name)
250    }
251
252    /// Generate a unique stream name by appending a number suffix if needed.
253    pub fn unique_stream_name(&self, name: &str) -> String {
254        let mut new_name = name.trim().to_string();
255        let mut i = 2;
256        let streams = self.streams.lock().unwrap();
257        while streams.values().any(|stream| stream.name() == new_name) {
258            new_name = format!("{}{}", name, i);
259            i += 1;
260        }
261        new_name
262    }
263
264    /// Add a new agent stream with the given name and spec, and returns the id of the new agent stream.
265    ///
266    /// The ids of the given spec, including agents and channels, are changed to new unique ids.
267    pub fn add_agent_stream(
268        &self,
269        name: String,
270        spec: AgentStreamSpec,
271    ) -> Result<String, AgentError> {
272        let stream = AgentStream::new(name, spec);
273        let id = stream.id().to_string();
274
275        // add agents
276        for agent in &stream.spec().agents {
277            if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
278                log::error!("Failed to add_agent {}: {}", agent.id, e);
279            }
280        }
281
282        // add channels
283        for channel in &stream.spec().channels {
284            self.add_channel_internal(channel.clone())
285                .unwrap_or_else(|e| {
286                    log::error!("Failed to add_channel {}: {}", channel.source, e);
287                });
288        }
289
290        // add the given stream into streams
291        let mut streams = self.streams.lock().unwrap();
292        if streams.contains_key(&id) {
293            return Err(AgentError::DuplicateId(id.into()));
294        }
295        streams.insert(id.to_string(), stream);
296
297        Ok(id)
298    }
299
300    /// Remove an agent stream by id.
301    pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
302        let mut stream = {
303            let mut streams = self.streams.lock().unwrap();
304            let Some(stream) = streams.swap_remove(id) else {
305                return Err(AgentError::StreamNotFound(id.to_string()));
306            };
307            stream
308        };
309
310        stream.stop(self).await.unwrap_or_else(|e| {
311            log::error!("Failed to stop stream {}: {}", id, e);
312        });
313
314        // Remove all agents and channels associated with the stream
315        for agent in &stream.spec().agents {
316            self.remove_agent_internal(&agent.id)
317                .await
318                .unwrap_or_else(|e| {
319                    log::error!("Failed to remove_agent {}: {}", agent.id, e);
320                });
321        }
322        for channel in &stream.spec().channels {
323            self.remove_channel_internal(channel);
324        }
325
326        Ok(())
327    }
328
329    /// Start an agent stream by id.
330    pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
331        let mut stream = {
332            let mut streams = self.streams.lock().unwrap();
333            let Some(stream) = streams.swap_remove(id) else {
334                return Err(AgentError::StreamNotFound(id.to_string()));
335            };
336            stream
337        };
338
339        stream.start(self).await?;
340
341        let mut streams = self.streams.lock().unwrap();
342        streams.insert(id.to_string(), stream);
343        Ok(())
344    }
345
346    /// Stop an agent stream by id.
347    pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
348        let mut stream = {
349            let mut streams = self.streams.lock().unwrap();
350            let Some(stream) = streams.swap_remove(id) else {
351                return Err(AgentError::StreamNotFound(id.to_string()));
352            };
353            stream
354        };
355
356        stream.stop(self).await?;
357
358        let mut streams = self.streams.lock().unwrap();
359        streams.insert(id.to_string(), stream);
360        Ok(())
361    }
362
363    // Agents
364
365    /// Create a new agent spec from the given agent definition name.
366    pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
367        let def = self
368            .get_agent_definition(def_name)
369            .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
370        Ok(def.to_spec())
371    }
372
373    /// Add an agent to the specified stream, and returns the id of the newly added agent.
374    pub fn add_agent(&self, stream_id: String, mut spec: AgentSpec) -> Result<String, AgentError> {
375        let mut streams = self.streams.lock().unwrap();
376        let Some(stream) = streams.get_mut(&stream_id) else {
377            return Err(AgentError::StreamNotFound(stream_id.to_string()));
378        };
379        let id = new_id();
380        spec.id = id.clone();
381        self.add_agent_internal(stream_id, spec.clone())?;
382        stream.add_agent(spec.clone());
383        Ok(id)
384    }
385
386    fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
387        let mut agents = self.agents.lock().unwrap();
388        if agents.contains_key(&spec.id) {
389            return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
390        }
391        let spec_id = spec.id.clone();
392        let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
393        agent.set_stream_id(stream_id);
394        agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
395        Ok(())
396    }
397
398    /// Get the agent by id.
399    pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
400        let agents = self.agents.lock().unwrap();
401        agents.get(agent_id).cloned()
402    }
403
404    /// Add a channel to the specified stream.
405    pub fn add_channel(&self, stream_id: &str, channel: ChannelSpec) -> Result<(), AgentError> {
406        // check if the source and target agents exist
407        {
408            let agents = self.agents.lock().unwrap();
409            if !agents.contains_key(&channel.source) {
410                return Err(AgentError::AgentNotFound(channel.source.to_string()));
411            }
412            if !agents.contains_key(&channel.target) {
413                return Err(AgentError::AgentNotFound(channel.target.to_string()));
414            }
415        }
416
417        // check if handles are valid
418        if channel.source_handle.is_empty() {
419            return Err(AgentError::EmptySourceHandle);
420        }
421        if channel.target_handle.is_empty() {
422            return Err(AgentError::EmptyTargetHandle);
423        }
424
425        let mut streams = self.streams.lock().unwrap();
426        let Some(stream) = streams.get_mut(stream_id) else {
427            return Err(AgentError::StreamNotFound(stream_id.to_string()));
428        };
429        stream.add_channel(channel.clone());
430        self.add_channel_internal(channel)?;
431        Ok(())
432    }
433
434    fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
435        let mut channels = self.channels.lock().unwrap();
436        if let Some(targets) = channels.get_mut(&channel.source) {
437            if targets
438                .iter()
439                .any(|(target, source_handle, target_handle)| {
440                    *target == channel.target
441                        && *source_handle == channel.source_handle
442                        && *target_handle == channel.target_handle
443                })
444            {
445                return Err(AgentError::ChannelAlreadyExists);
446            }
447            targets.push((channel.target, channel.source_handle, channel.target_handle));
448        } else {
449            channels.insert(
450                channel.source,
451                vec![(channel.target, channel.source_handle, channel.target_handle)],
452            );
453        }
454        Ok(())
455    }
456
457    /// Add agents and channels to the specified stream.
458    ///
459    /// The ids of the given agents and channels are changed to new unique ids.
460    /// The agents are not started automatically, even if the stream is running.
461    pub fn add_agents_and_channels(
462        &self,
463        stream_id: &str,
464        agents: &Vec<AgentSpec>,
465        channels: &Vec<ChannelSpec>,
466    ) -> Result<(Vec<AgentSpec>, Vec<ChannelSpec>), AgentError> {
467        let (agents, channels) = update_ids(agents, channels);
468
469        let mut streams = self.streams.lock().unwrap();
470        let Some(stream) = streams.get_mut(stream_id) else {
471            return Err(AgentError::StreamNotFound(stream_id.to_string()));
472        };
473
474        for agent in &agents {
475            self.add_agent_internal(stream_id.to_string(), agent.clone())?;
476            stream.add_agent(agent.clone());
477        }
478
479        for channel in &channels {
480            self.add_channel_internal(channel.clone())?;
481            stream.add_channel(channel.clone());
482        }
483
484        Ok((agents, channels))
485    }
486
487    /// Remove an agent from the specified stream.
488    ///
489    /// If the agent is running, it will be stopped first.
490    pub async fn remove_agent(&self, stream_id: &str, agent_id: &str) -> Result<(), AgentError> {
491        {
492            let mut streams = self.streams.lock().unwrap();
493            let Some(stream) = streams.get_mut(stream_id) else {
494                return Err(AgentError::StreamNotFound(stream_id.to_string()));
495            };
496            stream.remove_agent(agent_id);
497        }
498        if let Err(e) = self.remove_agent_internal(agent_id).await {
499            return Err(e);
500        }
501        Ok(())
502    }
503
504    async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
505        self.stop_agent(agent_id).await?;
506
507        // remove from channels
508        {
509            let mut channels = self.channels.lock().unwrap();
510            let mut sources_to_remove = Vec::new();
511            for (source, targets) in channels.iter_mut() {
512                targets.retain(|(target, _, _)| target != agent_id);
513                if targets.is_empty() {
514                    sources_to_remove.push(source.clone());
515                }
516            }
517            for source in sources_to_remove {
518                channels.swap_remove(&source);
519            }
520            channels.swap_remove(agent_id);
521        }
522
523        // remove from agents
524        {
525            let mut agents = self.agents.lock().unwrap();
526            agents.swap_remove(agent_id);
527        }
528
529        Ok(())
530    }
531
532    /// Remove a channel from the specified stream.
533    pub fn remove_channel(&self, stream_id: &str, channel: &ChannelSpec) -> Result<(), AgentError> {
534        let mut stream = {
535            let mut streams = self.streams.lock().unwrap();
536            let Some(stream) = streams.swap_remove(stream_id) else {
537                return Err(AgentError::StreamNotFound(stream_id.to_string()));
538            };
539            stream
540        };
541
542        let Some(channel) = stream.remove_channel(channel) else {
543            let mut streams = self.streams.lock().unwrap();
544            streams.insert(stream_id.to_string(), stream);
545            return Err(AgentError::ChannelNotFound(format!(
546                "{}:{}->{}:{}",
547                channel.source, channel.source_handle, channel.target, channel.target_handle
548            )));
549        };
550        let mut streams = self.streams.lock().unwrap();
551        streams.insert(stream_id.to_string(), stream);
552
553        self.remove_channel_internal(&channel);
554        Ok(())
555    }
556
557    fn remove_channel_internal(&self, channel: &ChannelSpec) {
558        let mut channels = self.channels.lock().unwrap();
559        if let Some(targets) = channels.get_mut(&channel.source) {
560            targets.retain(|(target, source_handle, target_handle)| {
561                *target != channel.target
562                    || *source_handle != channel.source_handle
563                    || *target_handle != channel.target_handle
564            });
565            if targets.is_empty() {
566                channels.swap_remove(&channel.source);
567            }
568        }
569    }
570
571    /// Start an agent by id.
572    pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
573        let agent = {
574            let agents = self.agents.lock().unwrap();
575            let Some(a) = agents.get(agent_id) else {
576                return Err(AgentError::AgentNotFound(agent_id.to_string()));
577            };
578            a.clone()
579        };
580        let def_name = {
581            let agent = agent.lock().await;
582            agent.def_name().to_string()
583        };
584        let uses_native_thread = {
585            let defs = self.defs.lock().unwrap();
586            let Some(def) = defs.get(&def_name) else {
587                return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
588            };
589            def.native_thread
590        };
591        let agent_status = {
592            // This will not block since the agent is not started yet.
593            let agent = agent.lock().await;
594            agent.status().clone()
595        };
596        if agent_status == AgentStatus::Init {
597            log::info!("Starting agent {}", agent_id);
598
599            let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
600
601            {
602                let mut agent_txs = self.agent_txs.lock().unwrap();
603                agent_txs.insert(agent_id.to_string(), tx.clone());
604            };
605
606            let agent_clone = agent.clone();
607            let agent_id_clone = agent_id.to_string();
608
609            let agent_loop = async move {
610                {
611                    let mut agent_guard = agent_clone.lock().await;
612                    if let Err(e) = agent_guard.start().await {
613                        log::error!("Failed to start agent {}: {}", agent_id_clone, e);
614                        return;
615                    }
616                }
617
618                while let Some(message) = rx.recv().await {
619                    match message {
620                        AgentMessage::Input { ctx, pin, value } => {
621                            agent_clone
622                                .lock()
623                                .await
624                                .process(ctx, pin, value)
625                                .await
626                                .unwrap_or_else(|e| {
627                                    log::error!("Process Error {}: {}", agent_id_clone, e);
628                                });
629                        }
630                        AgentMessage::Config { key, value } => {
631                            agent_clone
632                                .lock()
633                                .await
634                                .set_config(key, value)
635                                .unwrap_or_else(|e| {
636                                    log::error!("Config Error {}: {}", agent_id_clone, e);
637                                });
638                        }
639                        AgentMessage::Configs { configs } => {
640                            agent_clone
641                                .lock()
642                                .await
643                                .set_configs(configs)
644                                .unwrap_or_else(|e| {
645                                    log::error!("Configs Error {}: {}", agent_id_clone, e);
646                                });
647                        }
648                        AgentMessage::Stop => {
649                            rx.close();
650                            break;
651                        }
652                    }
653                }
654            };
655
656            if uses_native_thread {
657                std::thread::spawn(move || {
658                    let rt = tokio::runtime::Builder::new_current_thread()
659                        .enable_all()
660                        .build()
661                        .unwrap();
662                    rt.block_on(agent_loop);
663                });
664            } else {
665                tokio::spawn(agent_loop);
666            }
667        }
668        Ok(())
669    }
670
671    /// Stop an agent by id.
672    pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
673        {
674            // remove the sender first to prevent new messages being sent
675            let mut agent_txs = self.agent_txs.lock().unwrap();
676            if let Some(tx) = agent_txs.swap_remove(agent_id) {
677                if let Err(e) = tx.try_send(AgentMessage::Stop) {
678                    log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
679                }
680            }
681        }
682
683        let agent = {
684            let agents = self.agents.lock().unwrap();
685            let Some(a) = agents.get(agent_id) else {
686                return Err(AgentError::AgentNotFound(agent_id.to_string()));
687            };
688            a.clone()
689        };
690        let mut agent_guard = agent.lock().await;
691        if *agent_guard.status() == AgentStatus::Start {
692            log::info!("Stopping agent {}", agent_id);
693            agent_guard.stop().await?;
694        }
695
696        Ok(())
697    }
698
699    /// Set configs for an agent by id.
700    pub async fn set_agent_configs(
701        &self,
702        agent_id: String,
703        configs: AgentConfigs,
704    ) -> Result<(), AgentError> {
705        let tx = {
706            let agent_txs = self.agent_txs.lock().unwrap();
707            agent_txs.get(&agent_id).cloned()
708        };
709
710        let Some(tx) = tx else {
711            // The agent is not running. We can set the configs directly.
712            let agent = {
713                let agents = self.agents.lock().unwrap();
714                let Some(a) = agents.get(&agent_id) else {
715                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
716                };
717                a.clone()
718            };
719            agent.lock().await.set_configs(configs.clone())?;
720            return Ok(());
721        };
722        let message = AgentMessage::Configs { configs };
723        tx.send(message).await.map_err(|_| {
724            AgentError::SendMessageFailed("Failed to send config message".to_string())
725        })?;
726        Ok(())
727    }
728
729    /// Get global configs for the agent definition by name.
730    pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
731        let global_configs_map = self.global_configs_map.lock().unwrap();
732        global_configs_map.get(def_name).cloned()
733    }
734
735    /// Set global configs for the agent definition by name.
736    pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
737        let mut global_configs_map = self.global_configs_map.lock().unwrap();
738
739        let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
740            global_configs_map.insert(def_name, configs);
741            return;
742        };
743
744        for (key, value) in configs {
745            existing_configs.set(key, value);
746        }
747    }
748
749    /// Get the global configs map.
750    pub fn get_global_configs_map(&self) -> AgentConfigsMap {
751        let global_configs_map = self.global_configs_map.lock().unwrap();
752        global_configs_map.clone()
753    }
754
755    /// Set the global configs map.
756    pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
757        for (agent_name, new_configs) in new_configs_map {
758            self.set_global_configs(agent_name, new_configs);
759        }
760    }
761
762    /// Send input to an agent.
763    pub(crate) async fn agent_input(
764        &self,
765        agent_id: String,
766        ctx: AgentContext,
767        pin: String,
768        value: AgentValue,
769    ) -> Result<(), AgentError> {
770        let message = if pin.starts_with("config:") {
771            let config_key = pin[7..].to_string();
772            AgentMessage::Config {
773                key: config_key,
774                value,
775            }
776        } else {
777            AgentMessage::Input {
778                ctx,
779                pin: pin.clone(),
780                value,
781            }
782        };
783
784        let tx = {
785            let agent_txs = self.agent_txs.lock().unwrap();
786            agent_txs.get(&agent_id).cloned()
787        };
788
789        let Some(tx) = tx else {
790            // The agent is not running. If it's a config message, we can set it directly.
791            let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
792                let agents = self.agents.lock().unwrap();
793                let Some(a) = agents.get(&agent_id) else {
794                    return Err(AgentError::AgentNotFound(agent_id.to_string()));
795                };
796                a.clone()
797            };
798            if let AgentMessage::Config { key, value } = message {
799                agent.lock().await.set_config(key, value)?;
800            }
801            return Ok(());
802        };
803        tx.send(message).await.map_err(|_| {
804            AgentError::SendMessageFailed("Failed to send input message".to_string())
805        })?;
806
807        self.emit_agent_input(agent_id.to_string(), pin);
808
809        Ok(())
810    }
811
812    /// Send output from an agent. (Async version)
813    pub async fn send_agent_out(
814        &self,
815        agent_id: String,
816        ctx: AgentContext,
817        pin: String,
818        value: AgentValue,
819    ) -> Result<(), AgentError> {
820        message::send_agent_out(self, agent_id, ctx, pin, value).await
821    }
822
823    /// Send output from an agent.
824    pub fn try_send_agent_out(
825        &self,
826        agent_id: String,
827        ctx: AgentContext,
828        pin: String,
829        value: AgentValue,
830    ) -> Result<(), AgentError> {
831        message::try_send_agent_out(self, agent_id, ctx, pin, value)
832    }
833
834    /// Write a value to the board.
835    pub async fn write_board_value(
836        &self,
837        name: String,
838        value: AgentValue,
839    ) -> Result<(), AgentError> {
840        self.send_board_out(name, AgentContext::new(), value).await
841    }
842
843    /// Write a value to the variable board.
844    pub async fn write_var_value(
845        &self,
846        stream_id: &str,
847        name: &str,
848        value: AgentValue,
849    ) -> Result<(), AgentError> {
850        let var_name = format!("%{}/{}", stream_id, name);
851        self.send_board_out(var_name, AgentContext::new(), value)
852            .await
853    }
854
855    pub(crate) async fn send_board_out(
856        &self,
857        name: String,
858        ctx: AgentContext,
859        value: AgentValue,
860    ) -> Result<(), AgentError> {
861        message::send_board_out(self, name, ctx, value).await
862    }
863
864    async fn spawn_message_loop(&self) -> Result<(), AgentError> {
865        // TODO: settings for the channel size
866        let (tx, mut rx) = mpsc::channel(4096);
867        {
868            let mut tx_lock = self.tx.lock().unwrap();
869            *tx_lock = Some(tx);
870        }
871
872        // spawn the main loop
873        let askit = self.clone();
874        tokio::spawn(async move {
875            while let Some(message) = rx.recv().await {
876                use AgentEventMessage::*;
877
878                match message {
879                    AgentOut {
880                        agent,
881                        ctx,
882                        pin,
883                        value,
884                    } => {
885                        message::agent_out(&askit, agent, ctx, pin, value).await;
886                    }
887                    BoardOut { name, ctx, value } => {
888                        message::board_out(&askit, name, ctx, value).await;
889                    }
890                }
891            }
892        });
893
894        tokio::task::yield_now().await;
895
896        Ok(())
897    }
898
899    /// Subscribe to all ASKit events.
900    pub fn subscribe(&self) -> broadcast::Receiver<ASKitEvent> {
901        self.observers.subscribe()
902    }
903
904    /// Subscribe to a specific type of `ASKitEvent`.
905    ///
906    /// It takes a closure that filters and maps the events, and returns an `mpsc::UnboundedReceiver`
907    /// that will receive only the successfully mapped events.
908    pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
909    where
910        F: FnMut(ASKitEvent) -> Option<T> + Send + 'static,
911        T: Send + 'static,
912    {
913        let (tx, rx) = mpsc::unbounded_channel();
914        let mut event_rx = self.subscribe();
915
916        tokio::spawn(async move {
917            loop {
918                match event_rx.recv().await {
919                    Ok(event) => {
920                        if let Some(mapped_event) = filter_map(event) {
921                            if tx.send(mapped_event).is_err() {
922                                // Receiver dropped, task can exit
923                                break;
924                            }
925                        }
926                    }
927                    Err(RecvError::Lagged(n)) => {
928                        log::warn!("Event subscriber lagged by {} events", n);
929                    }
930                    Err(RecvError::Closed) => {
931                        // Sender dropped, task can exit
932                        break;
933                    }
934                }
935            }
936        });
937        rx
938    }
939
940    pub(crate) fn emit_agent_config_updated(
941        &self,
942        agent_id: String,
943        key: String,
944        value: AgentValue,
945    ) {
946        self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
947    }
948
949    pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
950        self.notify_observers(ASKitEvent::AgentError(agent_id, message));
951    }
952
953    pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
954        self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
955    }
956
957    pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
958        self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
959    }
960
961    pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
962        // // ignore variables
963        // if name.starts_with('%') {
964        //     return;
965        // }
966        self.notify_observers(ASKitEvent::Board(name, value));
967    }
968
969    fn notify_observers(&self, event: ASKitEvent) {
970        let _ = self.observers.send(event);
971    }
972}
973
974fn is_valid_stream_name(new_name: &str) -> bool {
975    // Check if the name is empty
976    if new_name.trim().is_empty() {
977        return false;
978    }
979
980    // Checks for path-like names:
981    if new_name.contains('/') {
982        // Disallow leading, trailing, or consecutive slashes
983        if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
984            return false;
985        }
986        // Disallow segments that are "." or ".."
987        if new_name
988            .split('/')
989            .any(|segment| segment == "." || segment == "..")
990        {
991            return false;
992        }
993    }
994
995    // Check if the name contains invalid characters
996    let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
997    for c in invalid_chars {
998        if new_name.contains(c) {
999            return false;
1000        }
1001    }
1002
1003    true
1004}
1005
1006#[derive(Clone, Debug)]
1007pub enum ASKitEvent {
1008    AgentConfigUpdated(String, String, AgentValue), // (agent_id, key, value)
1009    AgentError(String, String),                     // (agent_id, message)
1010    AgentIn(String, String),                        // (agent_id, pin)
1011    AgentSpecUpdated(String),                       // (agent_id)
1012    Board(String, AgentValue),                      // (board name, value)
1013}