1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use im::Vector;
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::FnvIndexMap;
8use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::id::{new_id, update_ids};
14use crate::message::{self, AgentEventMessage};
15use crate::registry;
16use crate::spec::{AgentSpec, AgentStreamSpec, ChannelSpec};
17use crate::stream::{AgentStream, AgentStreamInfo, AgentStreams};
18use crate::value::AgentValue;
19
20const MESSAGE_LIMIT: usize = 1024;
21
22#[derive(Clone)]
23pub struct ASKit {
24 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
26
27 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
29
30 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
32
33 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
35
36 pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
38
39 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
41
42 pub(crate) streams: Arc<Mutex<AgentStreams>>,
44
45 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
47
48 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
50
51 pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
53}
54
55impl ASKit {
56 pub fn new() -> Self {
57 Self {
58 agents: Default::default(),
59 agent_txs: Default::default(),
60 board_out_agents: Default::default(),
61 board_value: Default::default(),
62 channels: Default::default(),
63 defs: Default::default(),
64 streams: Default::default(),
65 global_configs_map: Default::default(),
66 tx: Arc::new(Mutex::new(None)),
67 observers: Default::default(),
68 }
69 }
70
71 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
72 self.tx
73 .lock()
74 .unwrap()
75 .clone()
76 .ok_or(AgentError::TxNotInitialized)
77 }
78
79 pub fn init() -> Result<Self, AgentError> {
81 let askit = Self::new();
82 askit.register_agents();
83 Ok(askit)
84 }
85
86 fn register_agents(&self) {
87 registry::register_inventory_agents(self);
88 }
89
90 pub async fn ready(&self) -> Result<(), AgentError> {
92 self.spawn_message_loop().await?;
93 self.start_agent_streams_on_start().await?;
94 Ok(())
95 }
96
97 pub fn quit(&self) {
99 let mut tx_lock = self.tx.lock().unwrap();
100 *tx_lock = None;
101 }
102
103 pub fn register_agent_definiton(&self, def: AgentDefinition) {
105 let def_name = def.name.clone();
106 let def_global_configs = def.global_configs.clone();
107
108 let mut defs = self.defs.lock().unwrap();
109 defs.insert(def.name.clone(), def);
110
111 if let Some(def_global_configs) = def_global_configs {
113 let mut new_configs = AgentConfigs::default();
114 for (key, config_entry) in def_global_configs.iter() {
115 new_configs.set(key.clone(), config_entry.value.clone());
116 }
117 self.set_global_configs(def_name, new_configs);
118 }
119 }
120
121 pub fn get_agent_definitions(&self) -> AgentDefinitions {
123 let defs = self.defs.lock().unwrap();
124 defs.clone()
125 }
126
127 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
129 let defs = self.defs.lock().unwrap();
130 defs.get(def_name).cloned()
131 }
132
133 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
135 let defs = self.defs.lock().unwrap();
136 let Some(def) = defs.get(def_name) else {
137 return None;
138 };
139 def.configs.clone()
140 }
141
142 pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
144 let agents = self.agents.lock().unwrap();
145 let Some(agent) = agents.get(agent_id) else {
146 return None;
147 };
148 let agent = agent.blocking_lock();
149 Some(agent.spec().clone())
150 }
151
152 pub fn get_agent_stream_info(&self, id: &str) -> Option<AgentStreamInfo> {
156 let streams = self.streams.lock().unwrap();
157 streams.get(id).map(|stream| stream.into())
158 }
159
160 pub fn get_agent_stream_infos(&self) -> Vec<AgentStreamInfo> {
162 let streams = self.streams.lock().unwrap();
163 streams.values().map(|s| s.into()).collect()
164 }
165
166 pub fn get_agent_stream_spec(&self, id: &str) -> Option<AgentStreamSpec> {
168 let streams = self.streams.lock().unwrap();
169 streams.get(id).map(|stream| stream.spec().clone())
170 }
171
172 pub fn set_agent_stream_spec(&self, id: &str, spec: AgentStreamSpec) -> Result<(), AgentError> {
174 let mut streams = self.streams.lock().unwrap();
175 let Some(stream) = streams.get_mut(id) else {
176 return Err(AgentError::StreamNotFound(id.to_string()));
177 };
178 *stream.spec_mut() = spec;
179 Ok(())
180 }
181
182 pub fn new_agent_stream(&self, name: &str) -> Result<String, AgentError> {
186 if !is_valid_stream_name(name) {
187 return Err(AgentError::InvalidStreamName(name.into()));
188 }
189 let new_name = self.unique_stream_name(name);
190 let spec = AgentStreamSpec::default();
191 let id = self.add_agent_stream(new_name, spec)?;
192 Ok(id)
193 }
194
195 pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
197 if !is_valid_stream_name(new_name) {
198 return Err(AgentError::InvalidStreamName(new_name.into()));
199 }
200
201 let new_name = self.unique_stream_name(new_name);
203
204 let mut streams = self.streams.lock().unwrap();
205
206 let Some(mut stream) = streams.swap_remove(id) else {
208 return Err(AgentError::RenameStreamFailed(id.into()));
209 };
210
211 stream.set_name(new_name.clone());
213 streams.insert(stream.id().to_string(), stream);
214 Ok(new_name)
215 }
216
217 pub fn unique_stream_name(&self, name: &str) -> String {
219 let mut new_name = name.trim().to_string();
220 let mut i = 2;
221 let streams = self.streams.lock().unwrap();
222 while streams.values().any(|stream| stream.name() == new_name) {
223 new_name = format!("{}{}", name, i);
224 i += 1;
225 }
226 new_name
227 }
228
229 pub fn add_agent_stream(
233 &self,
234 name: String,
235 spec: AgentStreamSpec,
236 ) -> Result<String, AgentError> {
237 let stream = AgentStream::new(name, spec);
238 let id = stream.id().to_string();
239
240 for agent in &stream.spec().agents {
242 if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
243 log::error!("Failed to add_agent {}: {}", agent.id, e);
244 }
245 }
246
247 for channel in &stream.spec().channels {
249 self.add_channel_internal(channel.clone())
250 .unwrap_or_else(|e| {
251 log::error!("Failed to add_channel {}: {}", channel.source, e);
252 });
253 }
254
255 let mut streams = self.streams.lock().unwrap();
257 if streams.contains_key(&id) {
258 return Err(AgentError::DuplicateId(id.into()));
259 }
260 streams.insert(id.to_string(), stream);
261
262 Ok(id)
263 }
264
265 pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
267 let mut stream = {
268 let mut streams = self.streams.lock().unwrap();
269 let Some(stream) = streams.swap_remove(id) else {
270 return Err(AgentError::StreamNotFound(id.to_string()));
271 };
272 stream
273 };
274
275 stream.stop(self).await.unwrap_or_else(|e| {
276 log::error!("Failed to stop stream {}: {}", id, e);
277 });
278
279 for agent in &stream.spec().agents {
281 self.remove_agent_internal(&agent.id)
282 .await
283 .unwrap_or_else(|e| {
284 log::error!("Failed to remove_agent {}: {}", agent.id, e);
285 });
286 }
287 for channel in &stream.spec().channels {
288 self.remove_channel_internal(channel);
289 }
290
291 Ok(())
292 }
293
294 pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
296 let mut stream = {
297 let mut streams = self.streams.lock().unwrap();
298 let Some(stream) = streams.swap_remove(id) else {
299 return Err(AgentError::StreamNotFound(id.to_string()));
300 };
301 stream
302 };
303
304 stream.start(self).await?;
305
306 let mut streams = self.streams.lock().unwrap();
307 streams.insert(id.to_string(), stream);
308 Ok(())
309 }
310
311 pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
313 let mut stream = {
314 let mut streams = self.streams.lock().unwrap();
315 let Some(stream) = streams.swap_remove(id) else {
316 return Err(AgentError::StreamNotFound(id.to_string()));
317 };
318 stream
319 };
320
321 stream.stop(self).await?;
322
323 let mut streams = self.streams.lock().unwrap();
324 streams.insert(id.to_string(), stream);
325 Ok(())
326 }
327
328 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
332 let def = self
333 .get_agent_definition(def_name)
334 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
335 Ok(def.to_spec())
336 }
337
338 pub fn add_agent(&self, stream_id: String, mut spec: AgentSpec) -> Result<String, AgentError> {
340 let mut streams = self.streams.lock().unwrap();
341 let Some(stream) = streams.get_mut(&stream_id) else {
342 return Err(AgentError::StreamNotFound(stream_id.to_string()));
343 };
344 let id = new_id();
345 spec.id = id.clone();
346 self.add_agent_internal(stream_id, spec.clone())?;
347 stream.spec_mut().add_agent(spec.clone());
348 Ok(id)
349 }
350
351 fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
352 let mut agents = self.agents.lock().unwrap();
353 if agents.contains_key(&spec.id) {
354 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
355 }
356 let spec_id = spec.id.clone();
357 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
358 agent.set_stream_id(stream_id);
359 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
360 Ok(())
361 }
362
363 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
365 let agents = self.agents.lock().unwrap();
366 agents.get(agent_id).cloned()
367 }
368
369 pub fn add_channel(&self, stream_id: &str, channel: ChannelSpec) -> Result<(), AgentError> {
371 {
373 let agents = self.agents.lock().unwrap();
374 if !agents.contains_key(&channel.source) {
375 return Err(AgentError::AgentNotFound(channel.source.to_string()));
376 }
377 if !agents.contains_key(&channel.target) {
378 return Err(AgentError::AgentNotFound(channel.target.to_string()));
379 }
380 }
381
382 if channel.source_handle.is_empty() {
384 return Err(AgentError::EmptySourceHandle);
385 }
386 if channel.target_handle.is_empty() {
387 return Err(AgentError::EmptyTargetHandle);
388 }
389
390 let mut streams = self.streams.lock().unwrap();
391 let Some(stream) = streams.get_mut(stream_id) else {
392 return Err(AgentError::StreamNotFound(stream_id.to_string()));
393 };
394 stream.spec_mut().add_channels(channel.clone());
395 self.add_channel_internal(channel)?;
396 Ok(())
397 }
398
399 fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
400 let mut channels = self.channels.lock().unwrap();
401 if let Some(targets) = channels.get_mut(&channel.source) {
402 if targets
403 .iter()
404 .any(|(target, source_handle, target_handle)| {
405 *target == channel.target
406 && *source_handle == channel.source_handle
407 && *target_handle == channel.target_handle
408 })
409 {
410 return Err(AgentError::ChannelAlreadyExists);
411 }
412 targets.push((channel.target, channel.source_handle, channel.target_handle));
413 } else {
414 channels.insert(
415 channel.source,
416 vec![(channel.target, channel.source_handle, channel.target_handle)],
417 );
418 }
419 Ok(())
420 }
421
422 pub fn add_agents_and_channels(
426 &self,
427 stream_id: &str,
428 agents: &Vector<AgentSpec>,
429 channels: &Vector<ChannelSpec>,
430 ) -> Result<(Vector<AgentSpec>, Vector<ChannelSpec>), AgentError> {
431 let (agents, channels) = update_ids(agents, channels);
432
433 let mut streams = self.streams.lock().unwrap();
434 let Some(stream) = streams.get_mut(stream_id) else {
435 return Err(AgentError::StreamNotFound(stream_id.to_string()));
436 };
437
438 for agent in &agents {
439 self.add_agent_internal(stream_id.to_string(), agent.clone())?;
440 stream.spec_mut().add_agent(agent.clone());
441 }
442
443 for channel in &channels {
444 self.add_channel_internal(channel.clone())?;
445 stream.spec_mut().add_channels(channel.clone());
446 }
447
448 Ok((agents, channels))
449 }
450
451 pub async fn remove_agent(&self, stream_id: &str, agent_id: &str) -> Result<(), AgentError> {
453 {
454 let mut streams = self.streams.lock().unwrap();
455 let Some(stream) = streams.get_mut(stream_id) else {
456 return Err(AgentError::StreamNotFound(stream_id.to_string()));
457 };
458 stream.spec_mut().remove_agent(agent_id);
459 }
460 if let Err(e) = self.remove_agent_internal(agent_id).await {
461 return Err(e);
462 }
463 Ok(())
464 }
465
466 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
467 self.stop_agent(agent_id).await?;
468
469 {
471 let mut channels = self.channels.lock().unwrap();
472 let mut sources_to_remove = Vec::new();
473 for (source, targets) in channels.iter_mut() {
474 targets.retain(|(target, _, _)| target != agent_id);
475 if targets.is_empty() {
476 sources_to_remove.push(source.clone());
477 }
478 }
479 for source in sources_to_remove {
480 channels.swap_remove(&source);
481 }
482 channels.swap_remove(agent_id);
483 }
484
485 {
487 let mut agents = self.agents.lock().unwrap();
488 agents.swap_remove(agent_id);
489 }
490
491 Ok(())
492 }
493
494 pub fn remove_channel(&self, stream_id: &str, channel: &ChannelSpec) -> Result<(), AgentError> {
496 let mut stream = {
497 let mut streams = self.streams.lock().unwrap();
498 let Some(stream) = streams.swap_remove(stream_id) else {
499 return Err(AgentError::StreamNotFound(stream_id.to_string()));
500 };
501 stream
502 };
503
504 let Some(channel) = stream.spec_mut().remove_channel(channel) else {
505 let mut streams = self.streams.lock().unwrap();
506 streams.insert(stream_id.to_string(), stream);
507 return Err(AgentError::ChannelNotFound(format!(
508 "{}:{}->{}:{}",
509 channel.source, channel.source_handle, channel.target, channel.target_handle
510 )));
511 };
512 let mut streams = self.streams.lock().unwrap();
513 streams.insert(stream_id.to_string(), stream);
514
515 self.remove_channel_internal(&channel);
516 Ok(())
517 }
518
519 fn remove_channel_internal(&self, channel: &ChannelSpec) {
520 let mut channels = self.channels.lock().unwrap();
521 if let Some(targets) = channels.get_mut(&channel.source) {
522 targets.retain(|(target, source_handle, target_handle)| {
523 *target != channel.target
524 || *source_handle != channel.source_handle
525 || *target_handle != channel.target_handle
526 });
527 if targets.is_empty() {
528 channels.swap_remove(&channel.source);
529 }
530 }
531 }
532
533 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
535 let agent = {
536 let agents = self.agents.lock().unwrap();
537 let Some(a) = agents.get(agent_id) else {
538 return Err(AgentError::AgentNotFound(agent_id.to_string()));
539 };
540 a.clone()
541 };
542 let def_name = {
543 let agent = agent.lock().await;
544 agent.def_name().to_string()
545 };
546 let uses_native_thread = {
547 let defs = self.defs.lock().unwrap();
548 let Some(def) = defs.get(&def_name) else {
549 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
550 };
551 def.native_thread
552 };
553 let agent_status = {
554 let agent = agent.lock().await;
556 agent.status().clone()
557 };
558 if agent_status == AgentStatus::Init {
559 log::info!("Starting agent {}", agent_id);
560
561 if uses_native_thread {
562 let (tx, rx) = std::sync::mpsc::channel();
563
564 {
565 let mut agent_txs = self.agent_txs.lock().unwrap();
566 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
567 };
568
569 let agent_id = agent_id.to_string();
570 std::thread::spawn(async move || {
571 if let Err(e) = agent.lock().await.start().await {
572 log::error!("Failed to start agent {}: {}", agent_id, e);
573 }
574
575 while let Ok(message) = rx.recv() {
576 match message {
577 AgentMessage::Input { ctx, pin, value } => {
578 agent
579 .lock()
580 .await
581 .process(ctx, pin, value)
582 .await
583 .unwrap_or_else(|e| {
584 log::error!("Process Error {}: {}", agent_id, e);
585 });
586 }
587 AgentMessage::Config { key, value } => {
588 agent
589 .lock()
590 .await
591 .set_config(key, value)
592 .unwrap_or_else(|e| {
593 log::error!("Config Error {}: {}", agent_id, e);
594 });
595 }
596 AgentMessage::Configs { configs } => {
597 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
598 log::error!("Configs Error {}: {}", agent_id, e);
599 });
600 }
601 AgentMessage::Stop => {
602 break;
603 }
604 }
605 }
606 });
607 } else {
608 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
609
610 {
611 let mut agent_txs = self.agent_txs.lock().unwrap();
612 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
613 };
614
615 let agent_id = agent_id.to_string();
616 tokio::spawn(async move {
617 {
618 let mut agent_guard = agent.lock().await;
619 if let Err(e) = agent_guard.start().await {
620 log::error!("Failed to start agent {}: {}", agent_id, e);
621 }
622 }
623
624 while let Some(message) = rx.recv().await {
625 match message {
626 AgentMessage::Input { ctx, pin, value } => {
627 agent
628 .lock()
629 .await
630 .process(ctx, pin, value)
631 .await
632 .unwrap_or_else(|e| {
633 log::error!("Process Error {}: {}", agent_id, e);
634 });
635 }
636 AgentMessage::Config { key, value } => {
637 agent
638 .lock()
639 .await
640 .set_config(key, value)
641 .unwrap_or_else(|e| {
642 log::error!("Config Error {}: {}", agent_id, e);
643 });
644 }
645 AgentMessage::Configs { configs } => {
646 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
647 log::error!("Configs Error {}: {}", agent_id, e);
648 });
649 }
650 AgentMessage::Stop => {
651 rx.close();
652 return;
653 }
654 }
655 }
656 });
657 tokio::task::yield_now().await;
658 }
659 }
660 Ok(())
661 }
662
663 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
665 {
666 let mut agent_txs = self.agent_txs.lock().unwrap();
668 if let Some(tx) = agent_txs.swap_remove(agent_id) {
669 match tx {
670 AgentMessageSender::Sync(tx) => {
671 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
672 log::error!("Failed to send stop message to agent {}: {}", agent_id, e);
673 });
674 }
675 AgentMessageSender::Async(tx) => {
676 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
677 log::error!("Failed to send stop message to agent {}: {}", agent_id, e);
678 });
679 }
680 }
681 }
682 }
683
684 let agent = {
685 let agents = self.agents.lock().unwrap();
686 let Some(a) = agents.get(agent_id) else {
687 return Err(AgentError::AgentNotFound(agent_id.to_string()));
688 };
689 a.clone()
690 };
691 let mut agent_guard = agent.lock().await;
692 if *agent_guard.status() == AgentStatus::Start {
693 log::info!("Stopping agent {}", agent_id);
694 agent_guard.stop().await?;
695 }
696
697 Ok(())
698 }
699
700 pub async fn set_agent_configs(
702 &self,
703 agent_id: String,
704 configs: AgentConfigs,
705 ) -> Result<(), AgentError> {
706 let tx = {
707 let agent_txs = self.agent_txs.lock().unwrap();
708 agent_txs.get(&agent_id).cloned()
709 };
710
711 let Some(tx) = tx else {
712 let agent = {
714 let agents = self.agents.lock().unwrap();
715 let Some(a) = agents.get(&agent_id) else {
716 return Err(AgentError::AgentNotFound(agent_id.to_string()));
717 };
718 a.clone()
719 };
720 agent.lock().await.set_configs(configs.clone())?;
721 return Ok(());
722 };
723 let message = AgentMessage::Configs { configs };
724 match tx {
725 AgentMessageSender::Sync(tx) => {
726 tx.send(message).map_err(|_| {
727 AgentError::SendMessageFailed("Failed to send config message".to_string())
728 })?;
729 }
730 AgentMessageSender::Async(tx) => {
731 tx.send(message).await.map_err(|_| {
732 AgentError::SendMessageFailed("Failed to send config message".to_string())
733 })?;
734 }
735 }
736 Ok(())
737 }
738
739 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
741 let global_configs_map = self.global_configs_map.lock().unwrap();
742 global_configs_map.get(def_name).cloned()
743 }
744
745 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
747 let mut global_configs_map = self.global_configs_map.lock().unwrap();
748
749 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
750 global_configs_map.insert(def_name, configs);
751 return;
752 };
753
754 for (key, value) in configs {
755 existing_configs.set(key, value);
756 }
757 }
758
759 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
761 let global_configs_map = self.global_configs_map.lock().unwrap();
762 global_configs_map.clone()
763 }
764
765 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
767 for (agent_name, new_configs) in new_configs_map {
768 self.set_global_configs(agent_name, new_configs);
769 }
770 }
771
772 pub(crate) async fn agent_input(
774 &self,
775 agent_id: String,
776 ctx: AgentContext,
777 pin: String,
778 value: AgentValue,
779 ) -> Result<(), AgentError> {
780 let message = if pin.starts_with("config:") {
781 let config_key = pin[7..].to_string();
782 AgentMessage::Config {
783 key: config_key,
784 value,
785 }
786 } else {
787 AgentMessage::Input {
788 ctx,
789 pin: pin.clone(),
790 value,
791 }
792 };
793
794 let tx = {
795 let agent_txs = self.agent_txs.lock().unwrap();
796 agent_txs.get(&agent_id).cloned()
797 };
798
799 let Some(tx) = tx else {
800 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
802 let agents = self.agents.lock().unwrap();
803 let Some(a) = agents.get(&agent_id) else {
804 return Err(AgentError::AgentNotFound(agent_id.to_string()));
805 };
806 a.clone()
807 };
808 if let AgentMessage::Config { key, value } = message {
809 agent.lock().await.set_config(key, value)?;
810 }
811 return Ok(());
812 };
813 match tx {
814 AgentMessageSender::Sync(tx) => {
815 tx.send(message).map_err(|_| {
816 AgentError::SendMessageFailed("Failed to send input message".to_string())
817 })?;
818 }
819 AgentMessageSender::Async(tx) => {
820 tx.send(message).await.map_err(|_| {
821 AgentError::SendMessageFailed("Failed to send input message".to_string())
822 })?;
823 }
824 }
825
826 self.emit_agent_input(agent_id.to_string(), pin);
827
828 Ok(())
829 }
830
831 pub async fn send_agent_out(
833 &self,
834 agent_id: String,
835 ctx: AgentContext,
836 pin: String,
837 value: AgentValue,
838 ) -> Result<(), AgentError> {
839 message::send_agent_out(self, agent_id, ctx, pin, value).await
840 }
841
842 pub fn try_send_agent_out(
844 &self,
845 agent_id: String,
846 ctx: AgentContext,
847 pin: String,
848 value: AgentValue,
849 ) -> Result<(), AgentError> {
850 message::try_send_agent_out(self, agent_id, ctx, pin, value)
851 }
852
853 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
855 self.try_send_board_out(name, AgentContext::new(), value)
856 }
857
858 pub fn write_var_value(
860 &self,
861 stream_id: &str,
862 name: &str,
863 value: AgentValue,
864 ) -> Result<(), AgentError> {
865 let var_name = format!("%{}/{}", stream_id, name);
866 self.try_send_board_out(var_name, AgentContext::new(), value)
867 }
868
869 pub(crate) fn try_send_board_out(
870 &self,
871 name: String,
872 ctx: AgentContext,
873 value: AgentValue,
874 ) -> Result<(), AgentError> {
875 message::try_send_board_out(self, name, ctx, value)
876 }
877
878 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
879 let (tx, mut rx) = mpsc::channel(4096);
881 {
882 let mut tx_lock = self.tx.lock().unwrap();
883 *tx_lock = Some(tx);
884 }
885
886 let askit = self.clone();
888 tokio::spawn(async move {
889 while let Some(message) = rx.recv().await {
890 use AgentEventMessage::*;
891
892 match message {
893 AgentOut {
894 agent,
895 ctx,
896 pin,
897 value,
898 } => {
899 message::agent_out(&askit, agent, ctx, pin, value).await;
900 }
901 BoardOut { name, ctx, value } => {
902 message::board_out(&askit, name, ctx, value).await;
903 }
904 }
905 }
906 });
907
908 tokio::task::yield_now().await;
909
910 Ok(())
911 }
912
913 async fn start_agent_streams_on_start(&self) -> Result<(), AgentError> {
914 let run_on_start_stream_ids;
915 {
916 let agent_streams = self.streams.lock().unwrap();
917 run_on_start_stream_ids = agent_streams
918 .values()
919 .filter(|s| s.spec().run_on_start)
920 .map(|s| s.id().to_string())
921 .collect::<Vec<_>>();
922 }
923
924 for id in run_on_start_stream_ids {
925 self.start_agent_stream(&id).await.unwrap_or_else(|e| {
926 log::error!("Failed to start agent stream: {}", e);
927 });
928 }
929 Ok(())
930 }
931
932 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
934 let mut observers = self.observers.lock().unwrap();
935 let observer_id = new_observer_id();
936 observers.insert(observer_id, observer);
937 observer_id
938 }
939
940 pub fn unsubscribe(&self, observer_id: usize) {
942 let mut observers = self.observers.lock().unwrap();
943 observers.swap_remove(&observer_id);
944 }
945
946 pub(crate) fn emit_agent_config_updated(
947 &self,
948 agent_id: String,
949 key: String,
950 value: AgentValue,
951 ) {
952 self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
953 }
954
955 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
956 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
957 }
958
959 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
960 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
961 }
962
963 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
964 self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
965 }
966
967 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
968 self.notify_observers(ASKitEvent::Board(name, value));
973 }
974
975 fn notify_observers(&self, event: ASKitEvent) {
976 let observers = self.observers.lock().unwrap();
977 for (_id, observer) in observers.iter() {
978 observer.notify(&event);
979 }
980 }
981}
982
983fn is_valid_stream_name(new_name: &str) -> bool {
984 if new_name.trim().is_empty() {
986 return false;
987 }
988
989 if new_name.contains('/') {
991 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
993 return false;
994 }
995 if new_name
997 .split('/')
998 .any(|segment| segment == "." || segment == "..")
999 {
1000 return false;
1001 }
1002 }
1003
1004 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
1006 for c in invalid_chars {
1007 if new_name.contains(c) {
1008 return false;
1009 }
1010 }
1011
1012 true
1013}
1014
1015#[derive(Clone, Debug)]
1016pub enum ASKitEvent {
1017 AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }
1023
1024pub trait ASKitObserver {
1025 fn notify(&self, event: &ASKitEvent);
1026}
1027
1028static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
1029
1030fn new_observer_id() -> usize {
1031 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1032}
1033
1034#[derive(Clone)]
1037pub enum AgentMessageSender {
1038 Sync(std::sync::mpsc::Sender<AgentMessage>),
1039 Async(mpsc::Sender<AgentMessage>),
1040}