1use std::sync::{Arc, Mutex};
2
3use serde_json::Value;
4use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::id::{new_id, update_ids};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::spec::{AgentSpec, AgentStreamSpec, ChannelSpec};
16use crate::stream::{AgentStream, AgentStreamInfo, AgentStreams};
17use crate::value::AgentValue;
18
19const MESSAGE_LIMIT: usize = 1024;
20const EVENT_CHANNEL_CAPACITY: usize = 256;
21
22#[derive(Clone)]
23pub struct ASKit {
24 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
26
27 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
29
30 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
32
33 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
35
36 pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
38
39 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
41
42 pub(crate) streams: Arc<Mutex<AgentStreams>>,
44
45 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
47
48 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
50
51 pub(crate) observers: broadcast::Sender<ASKitEvent>,
53}
54
55impl ASKit {
56 pub fn new() -> Self {
57 let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
58 Self {
59 agents: Default::default(),
60 agent_txs: Default::default(),
61 board_out_agents: Default::default(),
62 board_value: Default::default(),
63 channels: Default::default(),
64 defs: Default::default(),
65 streams: Default::default(),
66 global_configs_map: Default::default(),
67 tx: Arc::new(Mutex::new(None)),
68 observers: tx,
69 }
70 }
71
72 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
73 self.tx
74 .lock()
75 .unwrap()
76 .clone()
77 .ok_or(AgentError::TxNotInitialized)
78 }
79
80 pub fn init() -> Result<Self, AgentError> {
82 let askit = Self::new();
83 askit.register_agents();
84 Ok(askit)
85 }
86
87 fn register_agents(&self) {
88 registry::register_inventory_agents(self);
89 }
90
91 pub async fn ready(&self) -> Result<(), AgentError> {
93 self.spawn_message_loop().await?;
94 Ok(())
95 }
96
97 pub fn quit(&self) {
99 let mut tx_lock = self.tx.lock().unwrap();
100 *tx_lock = None;
101 }
102
103 pub fn register_agent_definiton(&self, def: AgentDefinition) {
105 let def_name = def.name.clone();
106 let def_global_configs = def.global_configs.clone();
107
108 let mut defs = self.defs.lock().unwrap();
109 defs.insert(def.name.clone(), def);
110
111 if let Some(def_global_configs) = def_global_configs {
113 let mut new_configs = AgentConfigs::default();
114 for (key, config_entry) in def_global_configs.iter() {
115 new_configs.set(key.clone(), config_entry.value.clone());
116 }
117 self.set_global_configs(def_name, new_configs);
118 }
119 }
120
121 pub fn get_agent_definitions(&self) -> AgentDefinitions {
123 let defs = self.defs.lock().unwrap();
124 defs.clone()
125 }
126
127 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
129 let defs = self.defs.lock().unwrap();
130 defs.get(def_name).cloned()
131 }
132
133 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
135 let defs = self.defs.lock().unwrap();
136 let Some(def) = defs.get(def_name) else {
137 return None;
138 };
139 def.configs.clone()
140 }
141
142 pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
144 let agent = {
145 let agents = self.agents.lock().unwrap();
146 let Some(agent) = agents.get(agent_id) else {
147 return None;
148 };
149 agent.clone()
150 };
151 let agent = agent.lock().await;
152 Some(agent.spec().clone())
153 }
154
155 pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
157 let agent = {
158 let agents = self.agents.lock().unwrap();
159 let Some(agent) = agents.get(agent_id) else {
160 return Err(AgentError::AgentNotFound(agent_id.to_string()));
161 };
162 agent.clone()
163 };
164 let mut agent = agent.lock().await;
165 agent.update_spec(value)?;
166 Ok(())
167 }
168
169 pub fn get_agent_stream_info(&self, id: &str) -> Option<AgentStreamInfo> {
173 let streams = self.streams.lock().unwrap();
174 streams.get(id).map(|stream| stream.into())
175 }
176
177 pub fn get_agent_stream_infos(&self) -> Vec<AgentStreamInfo> {
179 let streams = self.streams.lock().unwrap();
180 streams.values().map(|s| s.into()).collect()
181 }
182
183 pub async fn get_agent_stream_spec(&self, id: &str) -> Option<AgentStreamSpec> {
185 let stream_spec = {
186 let streams = self.streams.lock().unwrap();
187 streams.get(id).map(|stream| stream.spec().clone())
188 };
189 let Some(mut stream_spec) = stream_spec else {
190 return None;
191 };
192
193 let mut agent_specs = Vec::new();
195 for agent in &stream_spec.agents {
196 if let Some(spec) = self.get_agent_spec(&agent.id).await {
197 agent_specs.push(spec);
198 }
199 }
200 stream_spec.agents = agent_specs;
201
202 Some(stream_spec)
205 }
206
207 pub fn update_agent_stream_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
209 let mut streams = self.streams.lock().unwrap();
210 let Some(stream) = streams.get_mut(id) else {
211 return Err(AgentError::StreamNotFound(id.to_string()));
212 };
213 stream.update_spec(value)?;
214 Ok(())
215 }
216
217 pub fn new_agent_stream(&self, name: &str) -> Result<String, AgentError> {
221 if !is_valid_stream_name(name) {
222 return Err(AgentError::InvalidStreamName(name.into()));
223 }
224 let new_name = self.unique_stream_name(name);
225 let spec = AgentStreamSpec::default();
226 let id = self.add_agent_stream(new_name, spec)?;
227 Ok(id)
228 }
229
230 pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
232 if !is_valid_stream_name(new_name) {
233 return Err(AgentError::InvalidStreamName(new_name.into()));
234 }
235
236 let new_name = self.unique_stream_name(new_name);
238
239 let mut streams = self.streams.lock().unwrap();
240
241 let Some(mut stream) = streams.swap_remove(id) else {
243 return Err(AgentError::RenameStreamFailed(id.into()));
244 };
245
246 stream.set_name(new_name.clone());
248 streams.insert(stream.id().to_string(), stream);
249 Ok(new_name)
250 }
251
252 pub fn unique_stream_name(&self, name: &str) -> String {
254 let mut new_name = name.trim().to_string();
255 let mut i = 2;
256 let streams = self.streams.lock().unwrap();
257 while streams.values().any(|stream| stream.name() == new_name) {
258 new_name = format!("{}{}", name, i);
259 i += 1;
260 }
261 new_name
262 }
263
264 pub fn add_agent_stream(
268 &self,
269 name: String,
270 spec: AgentStreamSpec,
271 ) -> Result<String, AgentError> {
272 let stream = AgentStream::new(name, spec);
273 let id = stream.id().to_string();
274
275 for agent in &stream.spec().agents {
277 if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
278 log::error!("Failed to add_agent {}: {}", agent.id, e);
279 }
280 }
281
282 for channel in &stream.spec().channels {
284 self.add_channel_internal(channel.clone())
285 .unwrap_or_else(|e| {
286 log::error!("Failed to add_channel {}: {}", channel.source, e);
287 });
288 }
289
290 let mut streams = self.streams.lock().unwrap();
292 if streams.contains_key(&id) {
293 return Err(AgentError::DuplicateId(id.into()));
294 }
295 streams.insert(id.to_string(), stream);
296
297 Ok(id)
298 }
299
300 pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
302 let mut stream = {
303 let mut streams = self.streams.lock().unwrap();
304 let Some(stream) = streams.swap_remove(id) else {
305 return Err(AgentError::StreamNotFound(id.to_string()));
306 };
307 stream
308 };
309
310 stream.stop(self).await.unwrap_or_else(|e| {
311 log::error!("Failed to stop stream {}: {}", id, e);
312 });
313
314 for agent in &stream.spec().agents {
316 self.remove_agent_internal(&agent.id)
317 .await
318 .unwrap_or_else(|e| {
319 log::error!("Failed to remove_agent {}: {}", agent.id, e);
320 });
321 }
322 for channel in &stream.spec().channels {
323 self.remove_channel_internal(channel);
324 }
325
326 Ok(())
327 }
328
329 pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
331 let mut stream = {
332 let mut streams = self.streams.lock().unwrap();
333 let Some(stream) = streams.swap_remove(id) else {
334 return Err(AgentError::StreamNotFound(id.to_string()));
335 };
336 stream
337 };
338
339 stream.start(self).await?;
340
341 let mut streams = self.streams.lock().unwrap();
342 streams.insert(id.to_string(), stream);
343 Ok(())
344 }
345
346 pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
348 let mut stream = {
349 let mut streams = self.streams.lock().unwrap();
350 let Some(stream) = streams.swap_remove(id) else {
351 return Err(AgentError::StreamNotFound(id.to_string()));
352 };
353 stream
354 };
355
356 stream.stop(self).await?;
357
358 let mut streams = self.streams.lock().unwrap();
359 streams.insert(id.to_string(), stream);
360 Ok(())
361 }
362
363 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
367 let def = self
368 .get_agent_definition(def_name)
369 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
370 Ok(def.to_spec())
371 }
372
373 pub fn add_agent(&self, stream_id: String, mut spec: AgentSpec) -> Result<String, AgentError> {
375 let mut streams = self.streams.lock().unwrap();
376 let Some(stream) = streams.get_mut(&stream_id) else {
377 return Err(AgentError::StreamNotFound(stream_id.to_string()));
378 };
379 let id = new_id();
380 spec.id = id.clone();
381 self.add_agent_internal(stream_id, spec.clone())?;
382 stream.add_agent(spec.clone());
383 Ok(id)
384 }
385
386 fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
387 let mut agents = self.agents.lock().unwrap();
388 if agents.contains_key(&spec.id) {
389 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
390 }
391 let spec_id = spec.id.clone();
392 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
393 agent.set_stream_id(stream_id);
394 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
395 Ok(())
396 }
397
398 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
400 let agents = self.agents.lock().unwrap();
401 agents.get(agent_id).cloned()
402 }
403
404 pub fn add_channel(&self, stream_id: &str, channel: ChannelSpec) -> Result<(), AgentError> {
406 {
408 let agents = self.agents.lock().unwrap();
409 if !agents.contains_key(&channel.source) {
410 return Err(AgentError::AgentNotFound(channel.source.to_string()));
411 }
412 if !agents.contains_key(&channel.target) {
413 return Err(AgentError::AgentNotFound(channel.target.to_string()));
414 }
415 }
416
417 if channel.source_handle.is_empty() {
419 return Err(AgentError::EmptySourceHandle);
420 }
421 if channel.target_handle.is_empty() {
422 return Err(AgentError::EmptyTargetHandle);
423 }
424
425 let mut streams = self.streams.lock().unwrap();
426 let Some(stream) = streams.get_mut(stream_id) else {
427 return Err(AgentError::StreamNotFound(stream_id.to_string()));
428 };
429 stream.add_channel(channel.clone());
430 self.add_channel_internal(channel)?;
431 Ok(())
432 }
433
434 fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
435 let mut channels = self.channels.lock().unwrap();
436 if let Some(targets) = channels.get_mut(&channel.source) {
437 if targets
438 .iter()
439 .any(|(target, source_handle, target_handle)| {
440 *target == channel.target
441 && *source_handle == channel.source_handle
442 && *target_handle == channel.target_handle
443 })
444 {
445 return Err(AgentError::ChannelAlreadyExists);
446 }
447 targets.push((channel.target, channel.source_handle, channel.target_handle));
448 } else {
449 channels.insert(
450 channel.source,
451 vec![(channel.target, channel.source_handle, channel.target_handle)],
452 );
453 }
454 Ok(())
455 }
456
457 pub fn add_agents_and_channels(
462 &self,
463 stream_id: &str,
464 agents: &Vec<AgentSpec>,
465 channels: &Vec<ChannelSpec>,
466 ) -> Result<(Vec<AgentSpec>, Vec<ChannelSpec>), AgentError> {
467 let (agents, channels) = update_ids(agents, channels);
468
469 let mut streams = self.streams.lock().unwrap();
470 let Some(stream) = streams.get_mut(stream_id) else {
471 return Err(AgentError::StreamNotFound(stream_id.to_string()));
472 };
473
474 for agent in &agents {
475 self.add_agent_internal(stream_id.to_string(), agent.clone())?;
476 stream.add_agent(agent.clone());
477 }
478
479 for channel in &channels {
480 self.add_channel_internal(channel.clone())?;
481 stream.add_channel(channel.clone());
482 }
483
484 Ok((agents, channels))
485 }
486
487 pub async fn remove_agent(&self, stream_id: &str, agent_id: &str) -> Result<(), AgentError> {
491 {
492 let mut streams = self.streams.lock().unwrap();
493 let Some(stream) = streams.get_mut(stream_id) else {
494 return Err(AgentError::StreamNotFound(stream_id.to_string()));
495 };
496 stream.remove_agent(agent_id);
497 }
498 if let Err(e) = self.remove_agent_internal(agent_id).await {
499 return Err(e);
500 }
501 Ok(())
502 }
503
504 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
505 self.stop_agent(agent_id).await?;
506
507 {
509 let mut channels = self.channels.lock().unwrap();
510 let mut sources_to_remove = Vec::new();
511 for (source, targets) in channels.iter_mut() {
512 targets.retain(|(target, _, _)| target != agent_id);
513 if targets.is_empty() {
514 sources_to_remove.push(source.clone());
515 }
516 }
517 for source in sources_to_remove {
518 channels.swap_remove(&source);
519 }
520 channels.swap_remove(agent_id);
521 }
522
523 {
525 let mut agents = self.agents.lock().unwrap();
526 agents.swap_remove(agent_id);
527 }
528
529 Ok(())
530 }
531
532 pub fn remove_channel(&self, stream_id: &str, channel: &ChannelSpec) -> Result<(), AgentError> {
534 let mut stream = {
535 let mut streams = self.streams.lock().unwrap();
536 let Some(stream) = streams.swap_remove(stream_id) else {
537 return Err(AgentError::StreamNotFound(stream_id.to_string()));
538 };
539 stream
540 };
541
542 let Some(channel) = stream.remove_channel(channel) else {
543 let mut streams = self.streams.lock().unwrap();
544 streams.insert(stream_id.to_string(), stream);
545 return Err(AgentError::ChannelNotFound(format!(
546 "{}:{}->{}:{}",
547 channel.source, channel.source_handle, channel.target, channel.target_handle
548 )));
549 };
550 let mut streams = self.streams.lock().unwrap();
551 streams.insert(stream_id.to_string(), stream);
552
553 self.remove_channel_internal(&channel);
554 Ok(())
555 }
556
557 fn remove_channel_internal(&self, channel: &ChannelSpec) {
558 let mut channels = self.channels.lock().unwrap();
559 if let Some(targets) = channels.get_mut(&channel.source) {
560 targets.retain(|(target, source_handle, target_handle)| {
561 *target != channel.target
562 || *source_handle != channel.source_handle
563 || *target_handle != channel.target_handle
564 });
565 if targets.is_empty() {
566 channels.swap_remove(&channel.source);
567 }
568 }
569 }
570
571 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
573 let agent = {
574 let agents = self.agents.lock().unwrap();
575 let Some(a) = agents.get(agent_id) else {
576 return Err(AgentError::AgentNotFound(agent_id.to_string()));
577 };
578 a.clone()
579 };
580 let def_name = {
581 let agent = agent.lock().await;
582 agent.def_name().to_string()
583 };
584 let uses_native_thread = {
585 let defs = self.defs.lock().unwrap();
586 let Some(def) = defs.get(&def_name) else {
587 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
588 };
589 def.native_thread
590 };
591 let agent_status = {
592 let agent = agent.lock().await;
594 agent.status().clone()
595 };
596 if agent_status == AgentStatus::Init {
597 log::info!("Starting agent {}", agent_id);
598
599 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
600
601 {
602 let mut agent_txs = self.agent_txs.lock().unwrap();
603 agent_txs.insert(agent_id.to_string(), tx.clone());
604 };
605
606 let agent_clone = agent.clone();
607 let agent_id_clone = agent_id.to_string();
608
609 let agent_loop = async move {
610 {
611 let mut agent_guard = agent_clone.lock().await;
612 if let Err(e) = agent_guard.start().await {
613 log::error!("Failed to start agent {}: {}", agent_id_clone, e);
614 return;
615 }
616 }
617
618 while let Some(message) = rx.recv().await {
619 match message {
620 AgentMessage::Input { ctx, pin, value } => {
621 agent_clone
622 .lock()
623 .await
624 .process(ctx, pin, value)
625 .await
626 .unwrap_or_else(|e| {
627 log::error!("Process Error {}: {}", agent_id_clone, e);
628 });
629 }
630 AgentMessage::Config { key, value } => {
631 agent_clone
632 .lock()
633 .await
634 .set_config(key, value)
635 .unwrap_or_else(|e| {
636 log::error!("Config Error {}: {}", agent_id_clone, e);
637 });
638 }
639 AgentMessage::Configs { configs } => {
640 agent_clone
641 .lock()
642 .await
643 .set_configs(configs)
644 .unwrap_or_else(|e| {
645 log::error!("Configs Error {}: {}", agent_id_clone, e);
646 });
647 }
648 AgentMessage::Stop => {
649 rx.close();
650 break;
651 }
652 }
653 }
654 };
655
656 if uses_native_thread {
657 std::thread::spawn(move || {
658 let rt = tokio::runtime::Builder::new_current_thread()
659 .enable_all()
660 .build()
661 .unwrap();
662 rt.block_on(agent_loop);
663 });
664 } else {
665 tokio::spawn(agent_loop);
666 }
667 }
668 Ok(())
669 }
670
671 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
673 {
674 let mut agent_txs = self.agent_txs.lock().unwrap();
676 if let Some(tx) = agent_txs.swap_remove(agent_id) {
677 if let Err(e) = tx.try_send(AgentMessage::Stop) {
678 log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
679 }
680 }
681 }
682
683 let agent = {
684 let agents = self.agents.lock().unwrap();
685 let Some(a) = agents.get(agent_id) else {
686 return Err(AgentError::AgentNotFound(agent_id.to_string()));
687 };
688 a.clone()
689 };
690 let mut agent_guard = agent.lock().await;
691 if *agent_guard.status() == AgentStatus::Start {
692 log::info!("Stopping agent {}", agent_id);
693 agent_guard.stop().await?;
694 }
695
696 Ok(())
697 }
698
699 pub async fn set_agent_configs(
701 &self,
702 agent_id: String,
703 configs: AgentConfigs,
704 ) -> Result<(), AgentError> {
705 let tx = {
706 let agent_txs = self.agent_txs.lock().unwrap();
707 agent_txs.get(&agent_id).cloned()
708 };
709
710 let Some(tx) = tx else {
711 let agent = {
713 let agents = self.agents.lock().unwrap();
714 let Some(a) = agents.get(&agent_id) else {
715 return Err(AgentError::AgentNotFound(agent_id.to_string()));
716 };
717 a.clone()
718 };
719 agent.lock().await.set_configs(configs.clone())?;
720 return Ok(());
721 };
722 let message = AgentMessage::Configs { configs };
723 tx.send(message).await.map_err(|_| {
724 AgentError::SendMessageFailed("Failed to send config message".to_string())
725 })?;
726 Ok(())
727 }
728
729 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
731 let global_configs_map = self.global_configs_map.lock().unwrap();
732 global_configs_map.get(def_name).cloned()
733 }
734
735 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
737 let mut global_configs_map = self.global_configs_map.lock().unwrap();
738
739 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
740 global_configs_map.insert(def_name, configs);
741 return;
742 };
743
744 for (key, value) in configs {
745 existing_configs.set(key, value);
746 }
747 }
748
749 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
751 let global_configs_map = self.global_configs_map.lock().unwrap();
752 global_configs_map.clone()
753 }
754
755 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
757 for (agent_name, new_configs) in new_configs_map {
758 self.set_global_configs(agent_name, new_configs);
759 }
760 }
761
762 pub(crate) async fn agent_input(
764 &self,
765 agent_id: String,
766 ctx: AgentContext,
767 pin: String,
768 value: AgentValue,
769 ) -> Result<(), AgentError> {
770 let message = if pin.starts_with("config:") {
771 let config_key = pin[7..].to_string();
772 AgentMessage::Config {
773 key: config_key,
774 value,
775 }
776 } else {
777 AgentMessage::Input {
778 ctx,
779 pin: pin.clone(),
780 value,
781 }
782 };
783
784 let tx = {
785 let agent_txs = self.agent_txs.lock().unwrap();
786 agent_txs.get(&agent_id).cloned()
787 };
788
789 let Some(tx) = tx else {
790 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
792 let agents = self.agents.lock().unwrap();
793 let Some(a) = agents.get(&agent_id) else {
794 return Err(AgentError::AgentNotFound(agent_id.to_string()));
795 };
796 a.clone()
797 };
798 if let AgentMessage::Config { key, value } = message {
799 agent.lock().await.set_config(key, value)?;
800 }
801 return Ok(());
802 };
803 tx.send(message).await.map_err(|_| {
804 AgentError::SendMessageFailed("Failed to send input message".to_string())
805 })?;
806
807 self.emit_agent_input(agent_id.to_string(), pin);
808
809 Ok(())
810 }
811
812 pub async fn send_agent_out(
814 &self,
815 agent_id: String,
816 ctx: AgentContext,
817 pin: String,
818 value: AgentValue,
819 ) -> Result<(), AgentError> {
820 message::send_agent_out(self, agent_id, ctx, pin, value).await
821 }
822
823 pub fn try_send_agent_out(
825 &self,
826 agent_id: String,
827 ctx: AgentContext,
828 pin: String,
829 value: AgentValue,
830 ) -> Result<(), AgentError> {
831 message::try_send_agent_out(self, agent_id, ctx, pin, value)
832 }
833
834 pub async fn write_board_value(
836 &self,
837 name: String,
838 value: AgentValue,
839 ) -> Result<(), AgentError> {
840 self.send_board_out(name, AgentContext::new(), value).await
841 }
842
843 pub async fn write_var_value(
845 &self,
846 stream_id: &str,
847 name: &str,
848 value: AgentValue,
849 ) -> Result<(), AgentError> {
850 let var_name = format!("%{}/{}", stream_id, name);
851 self.send_board_out(var_name, AgentContext::new(), value)
852 .await
853 }
854
855 pub(crate) async fn send_board_out(
856 &self,
857 name: String,
858 ctx: AgentContext,
859 value: AgentValue,
860 ) -> Result<(), AgentError> {
861 message::send_board_out(self, name, ctx, value).await
862 }
863
864 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
865 let (tx, mut rx) = mpsc::channel(4096);
867 {
868 let mut tx_lock = self.tx.lock().unwrap();
869 *tx_lock = Some(tx);
870 }
871
872 let askit = self.clone();
874 tokio::spawn(async move {
875 while let Some(message) = rx.recv().await {
876 use AgentEventMessage::*;
877
878 match message {
879 AgentOut {
880 agent,
881 ctx,
882 pin,
883 value,
884 } => {
885 message::agent_out(&askit, agent, ctx, pin, value).await;
886 }
887 BoardOut { name, ctx, value } => {
888 message::board_out(&askit, name, ctx, value).await;
889 }
890 }
891 }
892 });
893
894 tokio::task::yield_now().await;
895
896 Ok(())
897 }
898
899 pub fn subscribe(&self) -> broadcast::Receiver<ASKitEvent> {
901 self.observers.subscribe()
902 }
903
904 pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
909 where
910 F: FnMut(ASKitEvent) -> Option<T> + Send + 'static,
911 T: Send + 'static,
912 {
913 let (tx, rx) = mpsc::unbounded_channel();
914 let mut event_rx = self.subscribe();
915
916 tokio::spawn(async move {
917 loop {
918 match event_rx.recv().await {
919 Ok(event) => {
920 if let Some(mapped_event) = filter_map(event) {
921 if tx.send(mapped_event).is_err() {
922 break;
924 }
925 }
926 }
927 Err(RecvError::Lagged(n)) => {
928 log::warn!("Event subscriber lagged by {} events", n);
929 }
930 Err(RecvError::Closed) => {
931 break;
933 }
934 }
935 }
936 });
937 rx
938 }
939
940 pub(crate) fn emit_agent_config_updated(
941 &self,
942 agent_id: String,
943 key: String,
944 value: AgentValue,
945 ) {
946 self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
947 }
948
949 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
950 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
951 }
952
953 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
954 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
955 }
956
957 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
958 self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
959 }
960
961 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
962 self.notify_observers(ASKitEvent::Board(name, value));
967 }
968
969 fn notify_observers(&self, event: ASKitEvent) {
970 let _ = self.observers.send(event);
971 }
972}
973
974fn is_valid_stream_name(new_name: &str) -> bool {
975 if new_name.trim().is_empty() {
977 return false;
978 }
979
980 if new_name.contains('/') {
982 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
984 return false;
985 }
986 if new_name
988 .split('/')
989 .any(|segment| segment == "." || segment == "..")
990 {
991 return false;
992 }
993 }
994
995 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
997 for c in invalid_chars {
998 if new_name.contains(c) {
999 return false;
1000 }
1001 }
1002
1003 true
1004}
1005
1006#[derive(Clone, Debug)]
1007pub enum ASKitEvent {
1008 AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }