1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::{Mutex as AsyncMutex, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::message::{self, AgentEventMessage};
13use crate::registry;
14use crate::spec::{self, AgentSpec, AgentStream, AgentStreams, ChannelSpec};
15use crate::value::AgentValue;
16
17const MESSAGE_LIMIT: usize = 1024;
18
19#[derive(Clone)]
20pub struct ASKit {
21 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
23
24 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
26
27 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
29
30 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
32
33 pub(crate) channels: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
35
36 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39 pub(crate) streams: Arc<Mutex<AgentStreams>>,
41
42 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
44
45 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48 pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53 pub fn new() -> Self {
54 Self {
55 agents: Default::default(),
56 agent_txs: Default::default(),
57 board_out_agents: Default::default(),
58 board_value: Default::default(),
59 channels: Default::default(),
60 defs: Default::default(),
61 streams: Default::default(),
62 global_configs_map: Default::default(),
63 tx: Arc::new(Mutex::new(None)),
64 observers: Default::default(),
65 }
66 }
67
68 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69 self.tx
70 .lock()
71 .unwrap()
72 .clone()
73 .ok_or(AgentError::TxNotInitialized)
74 }
75
76 pub fn init() -> Result<Self, AgentError> {
77 let askit = Self::new();
78 askit.register_agents();
79 Ok(askit)
80 }
81
82 fn register_agents(&self) {
83 registry::register_inventory_agents(self);
84 }
85
86 pub async fn ready(&self) -> Result<(), AgentError> {
87 self.spawn_message_loop().await?;
88 self.start_agent_streams().await?;
89 Ok(())
90 }
91
92 pub fn quit(&self) {
93 let mut tx_lock = self.tx.lock().unwrap();
94 *tx_lock = None;
95 }
96
97 pub fn register_agent_definiton(&self, def: AgentDefinition) {
98 let def_name = def.name.clone();
99 let def_global_configs = def.global_configs.clone();
100
101 let mut defs = self.defs.lock().unwrap();
102 defs.insert(def.name.clone(), def);
103
104 if let Some(def_global_configs) = def_global_configs {
106 let mut new_configs = AgentConfigs::default();
107 for (key, config_entry) in def_global_configs.iter() {
108 new_configs.set(key.clone(), config_entry.value.clone());
109 }
110 self.set_global_configs(def_name, new_configs);
111 }
112 }
113
114 pub fn get_agent_definitions(&self) -> AgentDefinitions {
115 let defs = self.defs.lock().unwrap();
116 defs.clone()
117 }
118
119 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
120 let defs = self.defs.lock().unwrap();
121 defs.get(def_name).cloned()
122 }
123
124 pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
125 let defs = self.defs.lock().unwrap();
126 let Some(def) = defs.get(def_name) else {
127 return None;
128 };
129 def.configs.clone()
130 }
131
132 pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
133 let agents = self.agents.lock().unwrap();
134 let Some(agent) = agents.get(agent_id) else {
135 return None;
136 };
137 let agent = agent.blocking_lock();
138 Some(agent.spec().clone())
139 }
140
141 pub fn get_agent_streams(&self) -> AgentStreams {
144 let streams = self.streams.lock().unwrap();
145 streams.clone()
146 }
147
148 pub fn new_agent_stream(&self, name: &str) -> Result<AgentStream, AgentError> {
149 if !Self::is_valid_stream_name(name) {
150 return Err(AgentError::InvalidStreamName(name.into()));
151 }
152
153 let new_name = self.unique_stream_name(name);
154 let mut streams = self.streams.lock().unwrap();
155 let stream = AgentStream::new(new_name.clone());
156 streams.insert(stream.id().to_string(), stream.clone());
157 Ok(stream)
158 }
159
160 pub fn rename_agent_stream(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
161 if !Self::is_valid_stream_name(new_name) {
162 return Err(AgentError::InvalidStreamName(new_name.into()));
163 }
164
165 let new_name = self.unique_stream_name(new_name);
167
168 let mut streams = self.streams.lock().unwrap();
169
170 let Some(mut stream) = streams.swap_remove(id) else {
172 return Err(AgentError::RenameStreamFailed(id.into()));
173 };
174
175 stream.set_name(new_name.clone());
177 streams.insert(stream.id().to_string(), stream);
178 Ok(new_name)
179 }
180
181 fn is_valid_stream_name(new_name: &str) -> bool {
182 if new_name.trim().is_empty() {
184 return false;
185 }
186
187 if new_name.contains('/') {
189 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
191 return false;
192 }
193 if new_name
195 .split('/')
196 .any(|segment| segment == "." || segment == "..")
197 {
198 return false;
199 }
200 }
201
202 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
204 for c in invalid_chars {
205 if new_name.contains(c) {
206 return false;
207 }
208 }
209
210 true
211 }
212
213 pub fn unique_stream_name(&self, name: &str) -> String {
214 let mut new_name = name.trim().to_string();
215 let mut i = 2;
216 let streams = self.streams.lock().unwrap();
217 while streams.values().any(|stream| stream.name() == new_name) {
218 new_name = format!("{}{}", name, i);
219 i += 1;
220 }
221 new_name
222 }
223
224 pub fn add_agent_stream(&self, agent_stream: &AgentStream) -> Result<(), AgentError> {
225 let id = agent_stream.id();
226
227 {
229 let mut streams = self.streams.lock().unwrap();
230 if streams.contains_key(id) {
231 return Err(AgentError::DuplicateId(id.into()));
232 }
233 streams.insert(id.to_string(), agent_stream.clone());
234 }
235
236 for agent in agent_stream.agents().iter() {
238 if let Err(e) = self.add_agent_internal(id.to_string(), agent.clone()) {
239 log::error!("Failed to add_agent {}: {}", agent.id, e);
240 }
241 }
242
243 for channel in agent_stream.channels().iter() {
245 self.add_channel_internal(channel.clone()).unwrap_or_else(|e| {
246 log::error!("Failed to add_channel {}: {}", channel.source, e);
247 });
248 }
249
250 Ok(())
251 }
252
253 pub async fn remove_agent_stream(&self, id: &str) -> Result<(), AgentError> {
254 let stream = {
255 let mut streams = self.streams.lock().unwrap();
256 let Some(stream) = streams.swap_remove(id) else {
257 return Err(AgentError::StreamNotFound(id.to_string()));
258 };
259 stream.clone()
260 };
261
262 stream.stop(self).await?;
263
264 for agent in stream.agents() {
266 self.remove_agent_internal(&agent.id).await?;
267 }
268 for channel in stream.channels() {
269 self.remove_channel_internal(channel);
270 }
271
272 Ok(())
273 }
274
275 pub fn insert_agent_stream(&self, stream: AgentStream) -> Result<(), AgentError> {
276 let stream_id = stream.id();
277
278 let mut streams = self.streams.lock().unwrap();
279 streams.insert(stream_id.to_string(), stream);
280 Ok(())
281 }
282
283 pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
285 let def = self
286 .get_agent_definition(def_name)
287 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
288 Ok(AgentSpec::from_def(&def))
289 }
290
291 pub fn add_agent(
293 &self,
294 stream_id: String,
295 spec: AgentSpec,
296 ) -> Result<(), AgentError> {
297 let mut streams = self.streams.lock().unwrap();
298 let Some(stream) = streams.get_mut(&stream_id) else {
299 return Err(AgentError::StreamNotFound(stream_id.to_string()));
300 };
301 self.add_agent_internal(stream_id, spec.clone())?;
302 stream.add_agent(spec.clone());
303 Ok(())
304 }
305
306 fn add_agent_internal(&self, stream_id: String, spec: AgentSpec) -> Result<(), AgentError> {
307 let mut agents = self.agents.lock().unwrap();
308 if agents.contains_key(&spec.id) {
309 return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
310 }
311 let spec_id = spec.id.clone();
312 let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
313 agent.set_stream_id(stream_id);
314 agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
315 Ok(())
316 }
317
318 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
320 let agents = self.agents.lock().unwrap();
321 agents.get(agent_id).cloned()
322 }
323
324 pub fn add_channel(
325 &self,
326 stream_id: &str,
327 channel: ChannelSpec,
328 ) -> Result<(), AgentError> {
329 let mut streams = self.streams.lock().unwrap();
330 let Some(stream) = streams.get_mut(stream_id) else {
331 return Err(AgentError::StreamNotFound(stream_id.to_string()));
332 };
333 stream.add_channels(channel.clone());
334 self.add_channel_internal(channel)?;
335 Ok(())
336 }
337
338 fn add_channel_internal(&self, channel: ChannelSpec) -> Result<(), AgentError> {
339 {
341 let agents = self.agents.lock().unwrap();
342 if !agents.contains_key(&channel.source) {
343 return Err(AgentError::SourceAgentNotFound(channel.source.to_string()));
344 }
345 }
346
347 if channel.source_handle.is_empty() {
349 return Err(AgentError::EmptySourceHandle);
350 }
351 if channel.target_handle.is_empty() {
352 return Err(AgentError::EmptyTargetHandle);
353 }
354
355 let mut channels = self.channels.lock().unwrap();
356 if let Some(targets) = channels.get_mut(&channel.source) {
357 if targets
358 .iter()
359 .any(|(target, source_handle, target_handle)| {
360 *target == channel.target
361 && *source_handle == channel.source_handle
362 && *target_handle == channel.target_handle
363 })
364 {
365 return Err(AgentError::ChannelAlreadyExists);
366 }
367 targets.push((
368 channel.target,
369 channel.source_handle,
370 channel.target_handle,
371 ));
372 } else {
373 channels.insert(
374 channel.source,
375 vec![(
376 channel.target,
377 channel.source_handle,
378 channel.target_handle,
379 )],
380 );
381 }
382 Ok(())
383 }
384
385 pub async fn remove_agent(
386 &self,
387 stream_id: &str,
388 agent_id: &str,
389 ) -> Result<(), AgentError> {
390 {
391 let mut streams = self.streams.lock().unwrap();
392 let Some(stream) = streams.get_mut(stream_id) else {
393 return Err(AgentError::StreamNotFound(stream_id.to_string()));
394 };
395 stream.remove_agent(agent_id);
396 }
397 self.remove_agent_internal(agent_id).await?;
398 Ok(())
399 }
400
401 async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
402 self.stop_agent(agent_id).await?;
403
404 {
406 let mut channels = self.channels.lock().unwrap();
407 let mut sources_to_remove = Vec::new();
408 for (source, targets) in channels.iter_mut() {
409 targets.retain(|(target, _, _)| target != agent_id);
410 if targets.is_empty() {
411 sources_to_remove.push(source.clone());
412 }
413 }
414 for source in sources_to_remove {
415 channels.swap_remove(&source);
416 }
417 channels.swap_remove(agent_id);
418 }
419
420 {
422 let mut agents = self.agents.lock().unwrap();
423 agents.swap_remove(agent_id);
424 }
425
426 Ok(())
427 }
428
429 pub fn remove_channel(
430 &self,
431 stream_id: &str,
432 channel_id: &str,
433 ) -> Result<(), AgentError> {
434 let mut streams = self.streams.lock().unwrap();
435 let Some(stream) = streams.get_mut(stream_id) else {
436 return Err(AgentError::StreamNotFound(stream_id.to_string()));
437 };
438 let Some(channel) = stream.remove_channel(channel_id) else {
439 return Err(AgentError::ChannelNotFound(channel_id.to_string()));
440 };
441 self.remove_channel_internal(&channel);
442 Ok(())
443 }
444
445 fn remove_channel_internal(&self, channel: &ChannelSpec) {
446 let mut channels = self.channels.lock().unwrap();
447 if let Some(targets) = channels.get_mut(&channel.source) {
448 targets.retain(|(target, source_handle, target_handle)| {
449 *target != channel.target
450 || *source_handle != channel.source_handle
451 || *target_handle != channel.target_handle
452 });
453 if targets.is_empty() {
454 channels.swap_remove(&channel.source);
455 }
456 }
457 }
458
459 pub fn copy_sub_stream(
460 &self,
461 agents: &Vec<AgentSpec>,
462 channels: &Vec<ChannelSpec>,
463 ) -> (Vec<AgentSpec>, Vec<ChannelSpec>) {
464 spec::copy_sub_stream(agents, channels)
465 }
466
467 pub async fn start_agent_stream(&self, id: &str) -> Result<(), AgentError> {
468 let stream = {
469 let streams = self.streams.lock().unwrap();
470 let Some(stream) = streams.get(id) else {
471 return Err(AgentError::StreamNotFound(id.to_string()));
472 };
473 stream.clone()
474 };
475 stream.start(self).await?;
476 Ok(())
477 }
478
479 pub async fn stop_agent_stream(&self, id: &str) -> Result<(), AgentError> {
480 let stream = {
481 let streams = self.streams.lock().unwrap();
482 let Some(stream) = streams.get(id) else {
483 return Err(AgentError::StreamNotFound(id.to_string()));
484 };
485 stream.clone()
486 };
487 stream.stop(self).await?;
488 Ok(())
489 }
490
491 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
492 let agent = {
493 let agents = self.agents.lock().unwrap();
494 let Some(a) = agents.get(agent_id) else {
495 return Err(AgentError::AgentNotFound(agent_id.to_string()));
496 };
497 a.clone()
498 };
499 let def_name = {
500 let agent = agent.lock().await;
501 agent.def_name().to_string()
502 };
503 let uses_native_thread = {
504 let defs = self.defs.lock().unwrap();
505 let Some(def) = defs.get(&def_name) else {
506 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
507 };
508 def.native_thread
509 };
510 let agent_status = {
511 let agent = agent.lock().await;
512 agent.status().clone()
513 };
514 if agent_status == AgentStatus::Init {
515 log::info!("Starting agent {}", agent_id);
516
517 if uses_native_thread {
518 let (tx, rx) = std::sync::mpsc::channel();
519
520 {
521 let mut agent_txs = self.agent_txs.lock().unwrap();
522 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
523 };
524
525 let agent_id = agent_id.to_string();
526 std::thread::spawn(async move || {
527 if let Err(e) = agent.lock().await.start().await {
528 log::error!("Failed to start agent {}: {}", agent_id, e);
529 }
530
531 while let Ok(message) = rx.recv() {
532 match message {
533 AgentMessage::Input { ctx, pin, value } => {
534 agent
535 .lock()
536 .await
537 .process(ctx, pin, value)
538 .await
539 .unwrap_or_else(|e| {
540 log::error!("Process Error {}: {}", agent_id, e);
541 });
542 }
543 AgentMessage::Config { configs } => {
544 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
545 log::error!("Config Error {}: {}", agent_id, e);
546 });
547 }
548 AgentMessage::Stop => {
549 break;
550 }
551 }
552 }
553 });
554 } else {
555 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
556
557 {
558 let mut agent_txs = self.agent_txs.lock().unwrap();
559 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
560 };
561
562 let agent_id = agent_id.to_string();
563 tokio::spawn(async move {
564 {
565 let mut agent_guard = agent.lock().await;
566 if let Err(e) = agent_guard.start().await {
567 log::error!("Failed to start agent {}: {}", agent_id, e);
568 }
569 }
570
571 while let Some(message) = rx.recv().await {
572 match message {
573 AgentMessage::Input { ctx, pin, value } => {
574 agent
575 .lock()
576 .await
577 .process(ctx, pin, value)
578 .await
579 .unwrap_or_else(|e| {
580 log::error!("Process Error {}: {}", agent_id, e);
581 });
582 }
583 AgentMessage::Config { configs } => {
584 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
585 log::error!("Config Error {}: {}", agent_id, e);
586 });
587 }
588 AgentMessage::Stop => {
589 rx.close();
590 return;
591 }
592 }
593 }
594 });
595 tokio::task::yield_now().await;
596 }
597 }
598 Ok(())
599 }
600
601 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
602 let agent = {
603 let agents = self.agents.lock().unwrap();
604 let Some(a) = agents.get(agent_id) else {
605 return Err(AgentError::AgentNotFound(agent_id.to_string()));
606 };
607 a.clone()
608 };
609
610 let agent_status = {
611 let agent = agent.lock().await;
612 agent.status().clone()
613 };
614 if agent_status == AgentStatus::Start {
615 log::info!("Stopping agent {}", agent_id);
616
617 {
618 let mut agent_txs = self.agent_txs.lock().unwrap();
619 if let Some(tx) = agent_txs.swap_remove(agent_id) {
620 match tx {
621 AgentMessageSender::Sync(tx) => {
622 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
623 log::error!(
624 "Failed to send stop message to agent {}: {}",
625 agent_id,
626 e
627 );
628 });
629 }
630 AgentMessageSender::Async(tx) => {
631 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
632 log::error!(
633 "Failed to send stop message to agent {}: {}",
634 agent_id,
635 e
636 );
637 });
638 }
639 }
640 }
641 }
642
643 agent.lock().await.stop().await?;
644 }
645
646 Ok(())
647 }
648
649 pub async fn set_agent_configs(
650 &self,
651 agent_id: String,
652 configs: AgentConfigs,
653 ) -> Result<(), AgentError> {
654 let agent = {
655 let agents = self.agents.lock().unwrap();
656 let Some(a) = agents.get(&agent_id) else {
657 return Err(AgentError::AgentNotFound(agent_id.to_string()));
658 };
659 a.clone()
660 };
661
662 let agent_status = {
663 let agent = agent.lock().await;
664 agent.status().clone()
665 };
666 if agent_status == AgentStatus::Init {
667 agent.lock().await.set_configs(configs.clone())?;
668 } else if agent_status == AgentStatus::Start {
669 let tx = {
670 let agent_txs = self.agent_txs.lock().unwrap();
671 let Some(tx) = agent_txs.get(&agent_id) else {
672 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
673 };
674 tx.clone()
675 };
676 let message = AgentMessage::Config { configs };
677 match tx {
678 AgentMessageSender::Sync(tx) => {
679 tx.send(message).map_err(|_| {
680 AgentError::SendMessageFailed("Failed to send config message".to_string())
681 })?;
682 }
683 AgentMessageSender::Async(tx) => {
684 tx.send(message).await.map_err(|_| {
685 AgentError::SendMessageFailed("Failed to send config message".to_string())
686 })?;
687 }
688 }
689 }
690 Ok(())
691 }
692
693 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
694 let global_configs_map = self.global_configs_map.lock().unwrap();
695 global_configs_map.get(def_name).cloned()
696 }
697
698 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
699 let mut global_configs_map = self.global_configs_map.lock().unwrap();
700
701 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
702 global_configs_map.insert(def_name, configs);
703 return;
704 };
705
706 for (key, value) in configs {
707 existing_configs.set(key, value);
708 }
709 }
710
711 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
712 let global_configs_map = self.global_configs_map.lock().unwrap();
713 global_configs_map.clone()
714 }
715
716 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
717 for (agent_name, new_configs) in new_configs_map {
718 self.set_global_configs(agent_name, new_configs);
719 }
720 }
721
722 pub async fn agent_input(
723 &self,
724 agent_id: String,
725 ctx: AgentContext,
726 pin: String,
727 value: AgentValue,
728 ) -> Result<(), AgentError> {
729 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
730 let agents = self.agents.lock().unwrap();
731 let Some(a) = agents.get(&agent_id) else {
732 return Err(AgentError::AgentNotFound(agent_id.to_string()));
733 };
734 a.clone()
735 };
736
737 let agent_status = {
738 let agent = agent.lock().await;
739 agent.status().clone()
740 };
741 if agent_status != AgentStatus::Start {
742 return Ok(());
743 }
744
745 if pin.starts_with("config:") {
746 let config_key = pin[7..].to_string();
747 let mut agent = agent.lock().await;
748 agent.set_config(config_key.clone(), value.clone())?;
749 return Ok(());
750 }
751
752 let message = AgentMessage::Input {
753 ctx,
754 pin: pin.clone(),
755 value,
756 };
757
758 let tx = {
759 let agent_txs = self.agent_txs.lock().unwrap();
760 let Some(tx) = agent_txs.get(&agent_id) else {
761 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
762 };
763 tx.clone()
764 };
765 match tx {
766 AgentMessageSender::Sync(tx) => {
767 tx.send(message).map_err(|_| {
768 AgentError::SendMessageFailed("Failed to send input message".to_string())
769 })?;
770 }
771 AgentMessageSender::Async(tx) => {
772 tx.send(message).await.map_err(|_| {
773 AgentError::SendMessageFailed("Failed to send input message".to_string())
774 })?;
775 }
776 }
777
778 self.emit_agent_input(agent_id.to_string(), pin);
779
780 Ok(())
781 }
782
783 pub async fn send_agent_out(
784 &self,
785 agent_id: String,
786 ctx: AgentContext,
787 pin: String,
788 value: AgentValue,
789 ) -> Result<(), AgentError> {
790 message::send_agent_out(self, agent_id, ctx, pin, value).await
791 }
792
793 pub fn try_send_agent_out(
794 &self,
795 agent_id: String,
796 ctx: AgentContext,
797 pin: String,
798 value: AgentValue,
799 ) -> Result<(), AgentError> {
800 message::try_send_agent_out(self, agent_id, ctx, pin, value)
801 }
802
803 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
804 self.try_send_board_out(name, AgentContext::new(), value)
805 }
806
807 pub(crate) fn try_send_board_out(
808 &self,
809 name: String,
810 ctx: AgentContext,
811 value: AgentValue,
812 ) -> Result<(), AgentError> {
813 message::try_send_board_out(self, name, ctx, value)
814 }
815
816 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
817 let (tx, mut rx) = mpsc::channel(4096);
819 {
820 let mut tx_lock = self.tx.lock().unwrap();
821 *tx_lock = Some(tx);
822 }
823
824 let askit = self.clone();
826 tokio::spawn(async move {
827 while let Some(message) = rx.recv().await {
828 use AgentEventMessage::*;
829
830 match message {
831 AgentOut {
832 agent,
833 ctx,
834 pin,
835 value,
836 } => {
837 message::agent_out(&askit, agent, ctx, pin, value).await;
838 }
839 BoardOut { name, ctx, value } => {
840 message::board_out(&askit, name, ctx, value).await;
841 }
842 }
843 }
844 });
845
846 tokio::task::yield_now().await;
847
848 Ok(())
849 }
850
851 async fn start_agent_streams(&self) -> Result<(), AgentError> {
852 let agent_stream_ids;
853 {
854 let agent_streams = self.streams.lock().unwrap();
855 agent_stream_ids = agent_streams.keys().cloned().collect::<Vec<_>>();
856 }
857 for id in agent_stream_ids {
858 self.start_agent_stream(&id).await.unwrap_or_else(|e| {
859 log::error!("Failed to start agent stream: {}", e);
860 });
861 }
862 Ok(())
863 }
864
865 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
866 let mut observers = self.observers.lock().unwrap();
867 let observer_id = new_observer_id();
868 observers.insert(observer_id, observer);
869 observer_id
870 }
871
872 pub fn unsubscribe(&self, observer_id: usize) {
873 let mut observers = self.observers.lock().unwrap();
874 observers.swap_remove(&observer_id);
875 }
876
877 pub(crate) fn emit_agent_config_updated(
878 &self,
879 agent_id: String,
880 key: String,
881 value: AgentValue,
882 ) {
883 self.notify_observers(ASKitEvent::AgentConfigUpdated(agent_id, key, value));
884 }
885
886 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
887 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
888 }
889
890 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
891 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
892 }
893
894 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
895 self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
896 }
897
898 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
899 if name.starts_with('%') {
901 return;
902 }
903 self.notify_observers(ASKitEvent::Board(name, value));
904 }
905
906 fn notify_observers(&self, event: ASKitEvent) {
907 let observers = self.observers.lock().unwrap();
908 for (_id, observer) in observers.iter() {
909 observer.notify(&event);
910 }
911 }
912}
913
914#[derive(Clone, Debug)]
915pub enum ASKitEvent {
916 AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }
922
923pub trait ASKitObserver {
924 fn notify(&self, event: &ASKitEvent);
925}
926
927static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
928
929fn new_observer_id() -> usize {
930 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
931}
932
933#[derive(Clone)]
936pub enum AgentMessageSender {
937 Sync(std::sync::mpsc::Sender<AgentMessage>),
938 Async(mpsc::Sender<AgentMessage>),
939}