1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::{Mutex as AsyncMutex, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::message::{self, AgentEventMessage};
13use crate::registry;
14use crate::spec::{self, AgentSpec, AgentStreamSpec, ChannelSpec};
15use crate::stream::{AgentStream, AgentStreamInfo, AgentStreams};
16use crate::value::AgentValue;
17
18const MESSAGE_LIMIT: usize = 1024;
19
20#[derive(Clone)]
21pub struct ASKit {
22 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
24
25 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
27
28 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
30
31 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
33
34 pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
36
37 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
39
40 pub(crate) streams: Arc<Mutex<AgentStreams>>,
42
43 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
45
46 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
48
49 pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
51}
52
53impl ASKit {
54 pub fn new() -> Self {
55 Self {
56 agents: Default::default(),
57 agent_txs: Default::default(),
58 board_out_agents: Default::default(),
59 board_value: Default::default(),
60 channels: Default::default(),
61 defs: Default::default(),
62 streams: Default::default(),
63 global_configs_map: Default::default(),
64 tx: Arc::new(Mutex::new(None)),
65 observers: Default::default(),
66 }
67 }
68
69 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
70 self.tx
71 .lock()
72 .unwrap()
73 .clone()
74 .ok_or(AgentError::TxNotInitialized)
75 }
76
77 pub fn init() -> Result<Self, AgentError> {
78 let askit = Self::new();
79 askit.register_agents();
80 Ok(askit)
81 }
82
83 fn register_agents(&self) {
84 registry::register_inventory_agents(self);
85 }
86
87 pub async fn ready(&self) -> Result<(), AgentError> {
88 self.spawn_message_loop().await?;
89 self.start_agent_streams_on_start().await?;
90 Ok(())
91 }
92
93 pub fn quit(&self) {
94 let mut tx_lock = self.tx.lock().unwrap();
95 *tx_lock = None;
96 }
97
98 pub fn register_agent_definiton(&self, def: AgentDefinition) {
99 let def_name = def.name.clone();
100 let def_global_configs = def.global_configs.clone();
101
102 let mut defs = self.defs.lock().unwrap();
103 defs.insert(def.name.clone(), def);
104
105 if let Some(def_global_configs) = def_global_configs {
107 let mut new_configs = AgentConfigs::default();
108 for (key, config_entry) in def_global_configs.iter() {
109 new_configs.set(key.clone(), config_entry.value.clone());
110 }
111 self.set_global_configs(def_name, new_configs);
112 }
113 }
114
115 pub fn get_agent_definitions(&self) -> AgentDefinitions {
116 let defs = self.defs.lock().unwrap();
117 defs.clone()
118 }
119
120 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
121 let defs = self.defs.lock().unwrap();
122 defs.get(def_name).cloned()
123 }
124
125 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
126 let defs = self.defs.lock().unwrap();
127 let Some(def) = defs.get(def_name) else {
128 return None;
129 };
130 def.configs.clone()
131 }
132
133 pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
134 let agents = self.agents.lock().unwrap();
135 let Some(agent) = agents.get(agent_id) else {
136 return None;
137 };
138 let agent = agent.blocking_lock();
139 Some(agent.spec().clone())
140 }
141
142 pub fn get_agent_stream_info(&self, id: &str) -> Option<AgentStreamInfo> {
146 let streams = self.streams.lock().unwrap();
147 streams.get(id).map(|stream| stream.into())
148 }
149
150 pub fn get_agent_stream_infos(&self) -> Vec<AgentStreamInfo> {
152 let streams = self.streams.lock().unwrap();
153 streams.values().map(|s| s.into()).collect()
154 }
155
156 pub fn get_agent_stream_spec(&self, id: &str) -> Option<AgentStreamSpec> {
158 let streams = self.streams.lock().unwrap();
159 streams.get(id).map(|stream| stream.spec().clone())
160 }
161
162 pub fn set_agent_stream_spec(&self, id: &str, spec: AgentStreamSpec) -> Result<(), AgentError> {
164 let mut streams = self.streams.lock().unwrap();
165 let Some(stream) = streams.get_mut(id) else {
166 return Err(AgentError::StreamNotFound(id.to_string()));
167 };
168 *stream.spec_mut() = spec;
169 Ok(())
170 }
171
172 pub fn new_agent_stream(&self, name: &str) -> Result<String, AgentError> {
176 if !is_valid_stream_name(name) {
177 return Err(AgentError::InvalidStreamName(name.into()));
178 }
179 let new_name = self.unique_stream_name(name);
180 let spec = AgentStreamSpec::default();
181 let id = self.add_agent_stream(new_name, spec)?;
182 Ok(id)
183 }
184
185 pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
186 if !is_valid_stream_name(new_name) {
187 return Err(AgentError::InvalidStreamName(new_name.into()));
188 }
189
190 let new_name = self.unique_stream_name(new_name);
192
193 let mut streams = self.streams.lock().unwrap();
194
195 let Some(mut stream) = streams.swap_remove(id) else {
197 return Err(AgentError::RenameStreamFailed(id.into()));
198 };
199
200 stream.set_name(new_name.clone());
202 streams.insert(stream.id().to_string(), stream);
203 Ok(new_name)
204 }
205
206 pub fn unique_stream_name(&self, name: &str) -> String {
207 let mut new_name = name.trim().to_string();
208 let mut i = 2;
209 let streams = self.streams.lock().unwrap();
210 while streams.values().any(|stream| stream.name() == new_name) {
211 new_name = format!("{}{}", name, i);
212 i += 1;
213 }
214 new_name
215 }
216
217 pub fn add_agent_stream(
218 &self,
219 name: String,
220 spec: AgentStreamSpec,
221 ) -> Result<String, AgentError> {
222 let stream = AgentStream::new(name, spec);
223 let id = stream.id().to_string();
224
225 for agent in &stream.spec().agents {
227 if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
228 log::error!("Failed to add_agent {}: {}", agent.id, e);
229 }
230 }
231
232 for channel in &stream.spec().channels {
234 self.add_channel_internal(channel.clone())
235 .unwrap_or_else(|e| {
236 log::error!("Failed to add_channel {}: {}", channel.source, e);
237 });
238 }
239
240 let mut streams = self.streams.lock().unwrap();
242 if streams.contains_key(&id) {
243 return Err(AgentError::DuplicateId(id.into()));
244 }
245 streams.insert(id.to_string(), stream);
246
247 Ok(id)
248 }
249
250 pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
251 let mut stream = {
252 let mut streams = self.streams.lock().unwrap();
253 let Some(stream) = streams.swap_remove(id) else {
254 return Err(AgentError::StreamNotFound(id.to_string()));
255 };
256 stream
257 };
258
259 stream.stop(self).await.unwrap_or_else(|e| {
260 log::error!("Failed to stop stream {}: {}", id, e);
261 });
262
263 for agent in &stream.spec().agents {
265 self.remove_agent_internal(&agent.id)
266 .await
267 .unwrap_or_else(|e| {
268 log::error!("Failed to remove_agent {}: {}", agent.id, e);
269 });
270 }
271 for channel in &stream.spec().channels {
272 self.remove_channel_internal(channel);
273 }
274
275 Ok(())
276 }
277
278 pub fn copy_sub_stream(
279 &self,
280 agents: &Vec<AgentSpec>,
281 channels: &Vec<ChannelSpec>,
282 ) -> (Vec<AgentSpec>, Vec<ChannelSpec>) {
283 spec::copy_sub_stream(agents, channels)
284 }
285
286 pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
287 let mut stream = {
288 let mut streams = self.streams.lock().unwrap();
289 let Some(stream) = streams.swap_remove(id) else {
290 return Err(AgentError::StreamNotFound(id.to_string()));
291 };
292 stream
293 };
294
295 stream.start(self).await?;
296
297 let mut streams = self.streams.lock().unwrap();
298 streams.insert(id.to_string(), stream);
299 Ok(())
300 }
301
302 pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
303 let mut stream = {
304 let mut streams = self.streams.lock().unwrap();
305 let Some(stream) = streams.swap_remove(id) else {
306 return Err(AgentError::StreamNotFound(id.to_string()));
307 };
308 stream
309 };
310
311 stream.stop(self).await?;
312
313 let mut streams = self.streams.lock().unwrap();
314 streams.insert(id.to_string(), stream);
315 Ok(())
316 }
317
318 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
322 let def = self
323 .get_agent_definition(def_name)
324 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
325 Ok(AgentSpec::from_def(&def))
326 }
327
328 pub fn add_agent(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
330 let mut streams = self.streams.lock().unwrap();
331 let Some(stream) = streams.get_mut(&stream_id) else {
332 return Err(AgentError::StreamNotFound(stream_id.to_string()));
333 };
334 self.add_agent_internal(stream_id, spec.clone())?;
335 stream.spec_mut().add_agent(spec.clone());
336 Ok(())
337 }
338
339 fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
340 let mut agents = self.agents.lock().unwrap();
341 if agents.contains_key(&spec.id) {
342 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
343 }
344 let spec_id = spec.id.clone();
345 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
346 agent.set_stream_id(stream_id);
347 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
348 Ok(())
349 }
350
351 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
353 let agents = self.agents.lock().unwrap();
354 agents.get(agent_id).cloned()
355 }
356
357 pub fn add_channel(&self, stream_id: &str, channel: ChannelSpec) -> Result<(), AgentError> {
358 let mut streams = self.streams.lock().unwrap();
359 let Some(stream) = streams.get_mut(stream_id) else {
360 return Err(AgentError::StreamNotFound(stream_id.to_string()));
361 };
362 stream.spec_mut().add_channels(channel.clone());
363 self.add_channel_internal(channel)?;
364 Ok(())
365 }
366
367 fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
368 {
370 let agents = self.agents.lock().unwrap();
371 if !agents.contains_key(&channel.source) {
372 return Err(AgentError::SourceAgentNotFound(channel.source.to_string()));
373 }
374 }
375
376 if channel.source_handle.is_empty() {
378 return Err(AgentError::EmptySourceHandle);
379 }
380 if channel.target_handle.is_empty() {
381 return Err(AgentError::EmptyTargetHandle);
382 }
383
384 let mut channels = self.channels.lock().unwrap();
385 if let Some(targets) = channels.get_mut(&channel.source) {
386 if targets
387 .iter()
388 .any(|(target, source_handle, target_handle)| {
389 *target == channel.target
390 && *source_handle == channel.source_handle
391 && *target_handle == channel.target_handle
392 })
393 {
394 return Err(AgentError::ChannelAlreadyExists);
395 }
396 targets.push((channel.target, channel.source_handle, channel.target_handle));
397 } else {
398 channels.insert(
399 channel.source,
400 vec![(channel.target, channel.source_handle, channel.target_handle)],
401 );
402 }
403 Ok(())
404 }
405
406 pub async fn remove_agent(&self, stream_id: &str, agent_id: &str) -> Result<(), AgentError> {
407 {
408 let mut streams = self.streams.lock().unwrap();
409 let Some(stream) = streams.get_mut(stream_id) else {
410 return Err(AgentError::StreamNotFound(stream_id.to_string()));
411 };
412 stream.spec_mut().remove_agent(agent_id);
413 }
414 if let Err(e) = self.remove_agent_internal(agent_id).await {
415 return Err(e);
416 }
417 Ok(())
418 }
419
420 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
421 self.stop_agent(agent_id).await?;
422
423 {
425 let mut channels = self.channels.lock().unwrap();
426 let mut sources_to_remove = Vec::new();
427 for (source, targets) in channels.iter_mut() {
428 targets.retain(|(target, _, _)| target != agent_id);
429 if targets.is_empty() {
430 sources_to_remove.push(source.clone());
431 }
432 }
433 for source in sources_to_remove {
434 channels.swap_remove(&source);
435 }
436 channels.swap_remove(agent_id);
437 }
438
439 {
441 let mut agents = self.agents.lock().unwrap();
442 agents.swap_remove(agent_id);
443 }
444
445 Ok(())
446 }
447
448 pub fn remove_channel(&self, stream_id: &str, channel_id: &str) -> Result<(), AgentError> {
449 let mut stream = {
450 let mut streams = self.streams.lock().unwrap();
451 let Some(stream) = streams.swap_remove(stream_id) else {
452 return Err(AgentError::StreamNotFound(stream_id.to_string()));
453 };
454 stream
455 };
456
457 let Some(channel) = stream.spec_mut().remove_channel(channel_id) else {
458 let mut streams = self.streams.lock().unwrap();
459 streams.insert(stream_id.to_string(), stream);
460 return Err(AgentError::ChannelNotFound(channel_id.to_string()));
461 };
462 let mut streams = self.streams.lock().unwrap();
463 streams.insert(stream_id.to_string(), stream);
464
465 self.remove_channel_internal(&channel);
466 Ok(())
467 }
468
469 fn remove_channel_internal(&self, channel: &ChannelSpec) {
470 let mut channels = self.channels.lock().unwrap();
471 if let Some(targets) = channels.get_mut(&channel.source) {
472 targets.retain(|(target, source_handle, target_handle)| {
473 *target != channel.target
474 || *source_handle != channel.source_handle
475 || *target_handle != channel.target_handle
476 });
477 if targets.is_empty() {
478 channels.swap_remove(&channel.source);
479 }
480 }
481 }
482
483 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
484 let agent = {
485 let agents = self.agents.lock().unwrap();
486 let Some(a) = agents.get(agent_id) else {
487 return Err(AgentError::AgentNotFound(agent_id.to_string()));
488 };
489 a.clone()
490 };
491 let def_name = {
492 let agent = agent.lock().await;
493 agent.def_name().to_string()
494 };
495 let uses_native_thread = {
496 let defs = self.defs.lock().unwrap();
497 let Some(def) = defs.get(&def_name) else {
498 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
499 };
500 def.native_thread
501 };
502 let agent_status = {
503 let agent = agent.lock().await;
504 agent.status().clone()
505 };
506 if agent_status == AgentStatus::Init {
507 log::info!("Starting agent {}", agent_id);
508
509 if uses_native_thread {
510 let (tx, rx) = std::sync::mpsc::channel();
511
512 {
513 let mut agent_txs = self.agent_txs.lock().unwrap();
514 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
515 };
516
517 let agent_id = agent_id.to_string();
518 std::thread::spawn(async move || {
519 if let Err(e) = agent.lock().await.start().await {
520 log::error!("Failed to start agent {}: {}", agent_id, e);
521 }
522
523 while let Ok(message) = rx.recv() {
524 match message {
525 AgentMessage::Input { ctx, pin, value } => {
526 agent
527 .lock()
528 .await
529 .process(ctx, pin, value)
530 .await
531 .unwrap_or_else(|e| {
532 log::error!("Process Error {}: {}", agent_id, e);
533 });
534 }
535 AgentMessage::Config { configs } => {
536 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
537 log::error!("Config Error {}: {}", agent_id, e);
538 });
539 }
540 AgentMessage::Stop => {
541 break;
542 }
543 }
544 }
545 });
546 } else {
547 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
548
549 {
550 let mut agent_txs = self.agent_txs.lock().unwrap();
551 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
552 };
553
554 let agent_id = agent_id.to_string();
555 tokio::spawn(async move {
556 {
557 let mut agent_guard = agent.lock().await;
558 if let Err(e) = agent_guard.start().await {
559 log::error!("Failed to start agent {}: {}", agent_id, e);
560 }
561 }
562
563 while let Some(message) = rx.recv().await {
564 match message {
565 AgentMessage::Input { ctx, pin, value } => {
566 agent
567 .lock()
568 .await
569 .process(ctx, pin, value)
570 .await
571 .unwrap_or_else(|e| {
572 log::error!("Process Error {}: {}", agent_id, e);
573 });
574 }
575 AgentMessage::Config { configs } => {
576 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
577 log::error!("Config Error {}: {}", agent_id, e);
578 });
579 }
580 AgentMessage::Stop => {
581 rx.close();
582 return;
583 }
584 }
585 }
586 });
587 tokio::task::yield_now().await;
588 }
589 }
590 Ok(())
591 }
592
593 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
594 let agent = {
595 let agents = self.agents.lock().unwrap();
596 let Some(a) = agents.get(agent_id) else {
597 return Err(AgentError::AgentNotFound(agent_id.to_string()));
598 };
599 a.clone()
600 };
601
602 let agent_status = {
603 let agent = agent.lock().await;
604 agent.status().clone()
605 };
606 if agent_status == AgentStatus::Start {
607 log::info!("Stopping agent {}", agent_id);
608
609 {
610 let mut agent_txs = self.agent_txs.lock().unwrap();
611 if let Some(tx) = agent_txs.swap_remove(agent_id) {
612 match tx {
613 AgentMessageSender::Sync(tx) => {
614 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
615 log::error!(
616 "Failed to send stop message to agent {}: {}",
617 agent_id,
618 e
619 );
620 });
621 }
622 AgentMessageSender::Async(tx) => {
623 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
624 log::error!(
625 "Failed to send stop message to agent {}: {}",
626 agent_id,
627 e
628 );
629 });
630 }
631 }
632 }
633 }
634
635 agent.lock().await.stop().await?;
636 }
637
638 Ok(())
639 }
640
641 pub async fn set_agent_configs(
642 &self,
643 agent_id: String,
644 configs: AgentConfigs,
645 ) -> Result<(), AgentError> {
646 let agent = {
647 let agents = self.agents.lock().unwrap();
648 let Some(a) = agents.get(&agent_id) else {
649 return Err(AgentError::AgentNotFound(agent_id.to_string()));
650 };
651 a.clone()
652 };
653
654 let agent_status = {
655 let agent = agent.lock().await;
656 agent.status().clone()
657 };
658 if agent_status == AgentStatus::Init {
659 agent.lock().await.set_configs(configs.clone())?;
660 } else if agent_status == AgentStatus::Start {
661 let tx = {
662 let agent_txs = self.agent_txs.lock().unwrap();
663 let Some(tx) = agent_txs.get(&agent_id) else {
664 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
665 };
666 tx.clone()
667 };
668 let message = AgentMessage::Config { configs };
669 match tx {
670 AgentMessageSender::Sync(tx) => {
671 tx.send(message).map_err(|_| {
672 AgentError::SendMessageFailed("Failed to send config message".to_string())
673 })?;
674 }
675 AgentMessageSender::Async(tx) => {
676 tx.send(message).await.map_err(|_| {
677 AgentError::SendMessageFailed("Failed to send config message".to_string())
678 })?;
679 }
680 }
681 }
682 Ok(())
683 }
684
685 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
686 let global_configs_map = self.global_configs_map.lock().unwrap();
687 global_configs_map.get(def_name).cloned()
688 }
689
690 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
691 let mut global_configs_map = self.global_configs_map.lock().unwrap();
692
693 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
694 global_configs_map.insert(def_name, configs);
695 return;
696 };
697
698 for (key, value) in configs {
699 existing_configs.set(key, value);
700 }
701 }
702
703 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
704 let global_configs_map = self.global_configs_map.lock().unwrap();
705 global_configs_map.clone()
706 }
707
708 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
709 for (agent_name, new_configs) in new_configs_map {
710 self.set_global_configs(agent_name, new_configs);
711 }
712 }
713
714 pub async fn agent_input(
715 &self,
716 agent_id: String,
717 ctx: AgentContext,
718 pin: String,
719 value: AgentValue,
720 ) -> Result<(), AgentError> {
721 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
722 let agents = self.agents.lock().unwrap();
723 let Some(a) = agents.get(&agent_id) else {
724 return Err(AgentError::AgentNotFound(agent_id.to_string()));
725 };
726 a.clone()
727 };
728
729 let agent_status = {
730 let agent = agent.lock().await;
731 agent.status().clone()
732 };
733 if agent_status != AgentStatus::Start {
734 return Ok(());
735 }
736
737 if pin.starts_with("config:") {
738 let config_key = pin[7..].to_string();
739 let mut agent = agent.lock().await;
740 agent.set_config(config_key.clone(), value.clone())?;
741 return Ok(());
742 }
743
744 let message = AgentMessage::Input {
745 ctx,
746 pin: pin.clone(),
747 value,
748 };
749
750 let tx = {
751 let agent_txs = self.agent_txs.lock().unwrap();
752 let Some(tx) = agent_txs.get(&agent_id) else {
753 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
754 };
755 tx.clone()
756 };
757 match tx {
758 AgentMessageSender::Sync(tx) => {
759 tx.send(message).map_err(|_| {
760 AgentError::SendMessageFailed("Failed to send input message".to_string())
761 })?;
762 }
763 AgentMessageSender::Async(tx) => {
764 tx.send(message).await.map_err(|_| {
765 AgentError::SendMessageFailed("Failed to send input message".to_string())
766 })?;
767 }
768 }
769
770 self.emit_agent_input(agent_id.to_string(), pin);
771
772 Ok(())
773 }
774
775 pub async fn send_agent_out(
776 &self,
777 agent_id: String,
778 ctx: AgentContext,
779 pin: String,
780 value: AgentValue,
781 ) -> Result<(), AgentError> {
782 message::send_agent_out(self, agent_id, ctx, pin, value).await
783 }
784
785 pub fn try_send_agent_out(
786 &self,
787 agent_id: String,
788 ctx: AgentContext,
789 pin: String,
790 value: AgentValue,
791 ) -> Result<(), AgentError> {
792 message::try_send_agent_out(self, agent_id, ctx, pin, value)
793 }
794
795 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
796 self.try_send_board_out(name, AgentContext::new(), value)
797 }
798
799 pub fn write_var_value(
800 &self,
801 stream_id: &str,
802 name: &str,
803 value: AgentValue,
804 ) -> Result<(), AgentError> {
805 let var_name = format!("%{}/{}", stream_id, name);
806 self.try_send_board_out(var_name, AgentContext::new(), value)
807 }
808
809 pub(crate) fn try_send_board_out(
810 &self,
811 name: String,
812 ctx: AgentContext,
813 value: AgentValue,
814 ) -> Result<(), AgentError> {
815 message::try_send_board_out(self, name, ctx, value)
816 }
817
818 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
819 let (tx, mut rx) = mpsc::channel(4096);
821 {
822 let mut tx_lock = self.tx.lock().unwrap();
823 *tx_lock = Some(tx);
824 }
825
826 let askit = self.clone();
828 tokio::spawn(async move {
829 while let Some(message) = rx.recv().await {
830 use AgentEventMessage::*;
831
832 match message {
833 AgentOut {
834 agent,
835 ctx,
836 pin,
837 value,
838 } => {
839 message::agent_out(&askit, agent, ctx, pin, value).await;
840 }
841 BoardOut { name, ctx, value } => {
842 message::board_out(&askit, name, ctx, value).await;
843 }
844 }
845 }
846 });
847
848 tokio::task::yield_now().await;
849
850 Ok(())
851 }
852
853 async fn start_agent_streams_on_start(&self) -> Result<(), AgentError> {
854 let run_on_start_stream_ids;
855 {
856 let agent_streams = self.streams.lock().unwrap();
857 run_on_start_stream_ids = agent_streams
858 .values()
859 .filter(|s| s.spec().run_on_start)
860 .map(|s| s.id().to_string())
861 .collect::<Vec<_>>();
862 }
863
864 for id in run_on_start_stream_ids {
865 self.start_agent_stream(&id).await.unwrap_or_else(|e| {
866 log::error!("Failed to start agent stream: {}", e);
867 });
868 }
869 Ok(())
870 }
871
872 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
873 let mut observers = self.observers.lock().unwrap();
874 let observer_id = new_observer_id();
875 observers.insert(observer_id, observer);
876 observer_id
877 }
878
879 pub fn unsubscribe(&self, observer_id: usize) {
880 let mut observers = self.observers.lock().unwrap();
881 observers.swap_remove(&observer_id);
882 }
883
884 pub(crate) fn emit_agent_config_updated(
885 &self,
886 agent_id: String,
887 key: String,
888 value: AgentValue,
889 ) {
890 self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
891 }
892
893 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
894 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
895 }
896
897 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
898 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
899 }
900
901 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
902 self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
903 }
904
905 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
906 self.notify_observers(ASKitEvent::Board(name, value));
911 }
912
913 fn notify_observers(&self, event: ASKitEvent) {
914 let observers = self.observers.lock().unwrap();
915 for (_id, observer) in observers.iter() {
916 observer.notify(&event);
917 }
918 }
919}
920
921fn is_valid_stream_name(new_name: &str) -> bool {
922 if new_name.trim().is_empty() {
924 return false;
925 }
926
927 if new_name.contains('/') {
929 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
931 return false;
932 }
933 if new_name
935 .split('/')
936 .any(|segment| segment == "." || segment == "..")
937 {
938 return false;
939 }
940 }
941
942 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
944 for c in invalid_chars {
945 if new_name.contains(c) {
946 return false;
947 }
948 }
949
950 true
951}
952
953#[derive(Clone, Debug)]
954pub enum ASKitEvent {
955 AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }
961
962pub trait ASKitObserver {
963 fn notify(&self, event: &ASKitEvent);
964}
965
966static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
967
968fn new_observer_id() -> usize {
969 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
970}
971
972#[derive(Clone)]
975pub enum AgentMessageSender {
976 Sync(std::sync::mpsc::Sender<AgentMessage>),
977 Async(mpsc::Sender<AgentMessage>),
978}