1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::registry;
9use crate::config::{AgentConfigs, AgentConfigsMap};
10use crate::context::AgentContext;
11use crate::definition::{AgentDefaultConfigs, AgentDefinition, AgentDefinitions};
12use crate::error::AgentError;
13use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
14use crate::message::{self, AgentEventMessage};
15use crate::value::AgentValue;
16
17#[derive(Clone)]
18pub struct ASKit {
19 pub(crate) agents:
21 Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23 pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26 pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29 pub(crate) board_value: Arc<Mutex<HashMap<String, AgentValue>>>,
31
32 pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38 pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41 pub(crate) global_configs_map: Arc<Mutex<HashMap<String, AgentConfigs>>>,
43
44 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47 pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52 pub fn new() -> Self {
53 Self {
54 agents: Default::default(),
55 agent_txs: Default::default(),
56 board_out_agents: Default::default(),
57 board_value: Default::default(),
58 edges: Default::default(),
59 defs: Default::default(),
60 flows: Default::default(),
61 global_configs_map: Default::default(),
62 tx: Arc::new(Mutex::new(None)),
63 observers: Default::default(),
64 }
65 }
66
67 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68 self.tx
69 .lock()
70 .unwrap()
71 .clone()
72 .ok_or(AgentError::TxNotInitialized)
73 }
74
75 pub fn init() -> Result<Self, AgentError> {
76 let askit = Self::new();
77 askit.register_agents();
78 Ok(askit)
79 }
80
81 fn register_agents(&self) {
82 registry::register_inventory_agents(self);
83 }
84
85 pub async fn ready(&self) -> Result<(), AgentError> {
86 self.spawn_message_loop()?;
87 self.start_agent_flows().await?;
88 Ok(())
89 }
90
91 pub fn quit(&self) {
92 let mut tx_lock = self.tx.lock().unwrap();
93 *tx_lock = None;
94 }
95
96 pub fn register_agent(&self, def: AgentDefinition) {
97 let def_name = def.name.clone();
98 let def_global_configs = def.global_configs.clone();
99
100 let mut defs = self.defs.lock().unwrap();
101 defs.insert(def.name.clone(), def);
102
103 if let Some(def_global_configs) = def_global_configs {
105 let mut new_configs = AgentConfigs::default();
106 for (key, config_entry) in def_global_configs.iter() {
107 new_configs.set(key.clone(), config_entry.value.clone());
108 }
109 self.set_global_configs(def_name, new_configs);
110 }
111 }
112
113 pub fn get_agent_definitions(&self) -> AgentDefinitions {
114 let defs = self.defs.lock().unwrap();
115 defs.clone()
116 }
117
118 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119 let defs = self.defs.lock().unwrap();
120 defs.get(def_name).cloned()
121 }
122
123 pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentDefaultConfigs> {
124 let defs = self.defs.lock().unwrap();
125 let Some(def) = defs.get(def_name) else {
126 return None;
127 };
128 def.default_configs.clone()
129 }
130
131 pub fn get_agent_flows(&self) -> AgentFlows {
134 let flows = self.flows.lock().unwrap();
135 flows.clone()
136 }
137
138 pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139 if !Self::is_valid_flow_name(name) {
140 return Err(AgentError::InvalidFlowName(name.into()));
141 }
142
143 let new_name = self.unique_flow_name(name);
144 let mut flows = self.flows.lock().unwrap();
145 let flow = AgentFlow::new(new_name.clone());
146 flows.insert(new_name, flow.clone());
147 Ok(flow)
148 }
149
150 pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
151 if !Self::is_valid_flow_name(new_name) {
152 return Err(AgentError::InvalidFlowName(new_name.into()));
153 }
154
155 let new_name = self.unique_flow_name(new_name);
157
158 let mut flows = self.flows.lock().unwrap();
159
160 let Some(mut flow) = flows.remove(old_name) else {
162 return Err(AgentError::RenameFlowFailed(old_name.into()));
163 };
164
165 flow.set_name(new_name.clone());
167 flows.insert(new_name.clone(), flow);
168 Ok(new_name)
169 }
170
171 fn is_valid_flow_name(new_name: &str) -> bool {
172 if new_name.trim().is_empty() {
174 return false;
175 }
176
177 if new_name.contains('/') {
179 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181 return false;
182 }
183 if new_name
185 .split('/')
186 .any(|segment| segment == "." || segment == "..")
187 {
188 return false;
189 }
190 }
191
192 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194 for c in invalid_chars {
195 if new_name.contains(c) {
196 return false;
197 }
198 }
199
200 true
201 }
202
203 pub fn unique_flow_name(&self, name: &str) -> String {
204 let mut new_name = name.trim().to_string();
205 let mut i = 2;
206 let flows = self.flows.lock().unwrap();
207 while flows.contains_key(&new_name) {
208 new_name = format!("{}{}", name, i);
209 i += 1;
210 }
211 new_name
212 }
213
214 pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215 let name = agent_flow.name();
216
217 {
219 let mut flows = self.flows.lock().unwrap();
220 if flows.contains_key(name) {
221 return Err(AgentError::DuplicateFlowName(name.into()));
222 }
223 flows.insert(name.into(), agent_flow.clone());
224 }
225
226 for node in agent_flow.nodes().iter() {
228 self.add_agent(name, node).unwrap_or_else(|e| {
229 log::error!("Failed to add_agent_node {}: {}", node.id, e);
230 });
231 }
232
233 for edge in agent_flow.edges().iter() {
235 self.add_edge(edge).unwrap_or_else(|e| {
236 log::error!("Failed to add_edge {}: {}", edge.source, e);
237 });
238 }
239
240 Ok(())
241 }
242
243 pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
244 let flow = {
245 let mut flows = self.flows.lock().unwrap();
246 let Some(flow) = flows.remove(flow_name) else {
247 return Err(AgentError::FlowNotFound(flow_name.to_string()));
248 };
249 flow.clone()
250 };
251
252 flow.stop(self).await?;
253
254 for node in flow.nodes() {
256 self.remove_agent(&node.id).await?;
257 }
258 for edge in flow.edges() {
259 self.remove_edge(edge);
260 }
261
262 Ok(())
263 }
264
265 pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266 let flow_name = flow.name();
267
268 let mut flows = self.flows.lock().unwrap();
269 flows.insert(flow_name.to_string(), flow);
270 Ok(())
271 }
272
273 pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
274 let def = self
275 .get_agent_definition(def_name)
276 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
277 AgentFlowNode::new(&def)
278 }
279
280 pub fn add_agent_flow_node(
281 &self,
282 flow_name: &str,
283 node: &AgentFlowNode,
284 ) -> Result<(), AgentError> {
285 let mut flows = self.flows.lock().unwrap();
286 let Some(flow) = flows.get_mut(flow_name) else {
287 return Err(AgentError::FlowNotFound(flow_name.to_string()));
288 };
289 flow.add_node(node.clone());
290 self.add_agent(flow_name, node)?;
291 Ok(())
292 }
293
294 pub(crate) fn add_agent(
295 &self,
296 flow_name: &str,
297 node: &AgentFlowNode,
298 ) -> Result<(), AgentError> {
299 let mut agents = self.agents.lock().unwrap();
300 if agents.contains_key(&node.id) {
301 return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
302 }
303 if let Ok(mut agent) = agent_new(
304 self.clone(),
305 node.id.clone(),
306 &node.def_name,
307 node.configs.clone(),
308 ) {
309 agent.set_flow_name(flow_name.to_string());
310 agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
311 } else {
312 return Err(AgentError::AgentCreationFailed(node.id.to_string()));
313 }
314 Ok(())
315 }
316
317 pub fn add_agent_flow_edge(
318 &self,
319 flow_name: &str,
320 edge: &AgentFlowEdge,
321 ) -> Result<(), AgentError> {
322 let mut flows = self.flows.lock().unwrap();
323 let Some(flow) = flows.get_mut(flow_name) else {
324 return Err(AgentError::FlowNotFound(flow_name.to_string()));
325 };
326 flow.add_edge(edge.clone());
327 self.add_edge(edge)?;
328 Ok(())
329 }
330
331 pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
332 {
334 let agents = self.agents.lock().unwrap();
335 if !agents.contains_key(&edge.source) {
336 return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
337 }
338 }
339
340 if edge.source_handle.is_empty() {
342 return Err(AgentError::EmptySourceHandle);
343 }
344 if edge.target_handle.is_empty() {
345 return Err(AgentError::EmptyTargetHandle);
346 }
347
348 let mut edges = self.edges.lock().unwrap();
349 if let Some(targets) = edges.get_mut(&edge.source) {
350 if targets
351 .iter()
352 .any(|(target, source_handle, target_handle)| {
353 *target == edge.target
354 && *source_handle == edge.source_handle
355 && *target_handle == edge.target_handle
356 })
357 {
358 return Err(AgentError::EdgeAlreadyExists);
359 }
360 targets.push((
361 edge.target.clone(),
362 edge.source_handle.clone(),
363 edge.target_handle.clone(),
364 ));
365 } else {
366 edges.insert(
367 edge.source.clone(),
368 vec![(
369 edge.target.clone(),
370 edge.source_handle.clone(),
371 edge.target_handle.clone(),
372 )],
373 );
374 }
375 Ok(())
376 }
377
378 pub async fn remove_agent_flow_node(
379 &self,
380 flow_name: &str,
381 node_id: &str,
382 ) -> Result<(), AgentError> {
383 {
384 let mut flows = self.flows.lock().unwrap();
385 let Some(flow) = flows.get_mut(flow_name) else {
386 return Err(AgentError::FlowNotFound(flow_name.to_string()));
387 };
388 flow.remove_node(node_id);
389 }
390 self.remove_agent(node_id).await?;
391 Ok(())
392 }
393
394 pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
395 self.stop_agent(agent_id).await?;
396
397 {
399 let mut edges = self.edges.lock().unwrap();
400 let mut sources_to_remove = Vec::new();
401 for (source, targets) in edges.iter_mut() {
402 targets.retain(|(target, _, _)| target != agent_id);
403 if targets.is_empty() {
404 sources_to_remove.push(source.clone());
405 }
406 }
407 for source in sources_to_remove {
408 edges.remove(&source);
409 }
410 edges.remove(agent_id);
411 }
412
413 {
415 let mut agents = self.agents.lock().unwrap();
416 agents.remove(agent_id);
417 }
418
419 Ok(())
420 }
421
422 pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
423 let mut flows = self.flows.lock().unwrap();
424 let Some(flow) = flows.get_mut(flow_name) else {
425 return Err(AgentError::FlowNotFound(flow_name.to_string()));
426 };
427 let Some(edge) = flow.remove_edge(edge_id) else {
428 return Err(AgentError::EdgeNotFound(edge_id.to_string()));
429 };
430 self.remove_edge(&edge);
431 Ok(())
432 }
433
434 pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
435 let mut edges = self.edges.lock().unwrap();
436 if let Some(targets) = edges.get_mut(&edge.source) {
437 targets.retain(|(target, source_handle, target_handle)| {
438 *target != edge.target
439 || *source_handle != edge.source_handle
440 || *target_handle != edge.target_handle
441 });
442 if targets.is_empty() {
443 edges.remove(&edge.source);
444 }
445 }
446 }
447
448 pub fn copy_sub_flow(
449 &self,
450 nodes: &Vec<AgentFlowNode>,
451 edges: &Vec<AgentFlowEdge>,
452 ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
453 flow::copy_sub_flow(nodes, edges)
454 }
455
456 pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
457 let flow = {
458 let flows = self.flows.lock().unwrap();
459 let Some(flow) = flows.get(name) else {
460 return Err(AgentError::FlowNotFound(name.to_string()));
461 };
462 flow.clone()
463 };
464 flow.start(self).await?;
465 Ok(())
466 }
467
468 pub async fn stop_agent_flow(&self, name: &str) -> Result<(), AgentError> {
469 let flow = {
470 let flows = self.flows.lock().unwrap();
471 let Some(flow) = flows.get(name) else {
472 return Err(AgentError::FlowNotFound(name.to_string()));
473 };
474 flow.clone()
475 };
476 flow.stop(self).await?;
477 Ok(())
478 }
479
480 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
481 let agent = {
482 let agents = self.agents.lock().unwrap();
483 let Some(a) = agents.get(agent_id) else {
484 return Err(AgentError::AgentNotFound(agent_id.to_string()));
485 };
486 a.clone()
487 };
488 let def_name = {
489 let agent = agent.lock().await;
490 agent.def_name().to_string()
491 };
492 let uses_native_thread = {
493 let defs = self.defs.lock().unwrap();
494 let Some(def) = defs.get(&def_name) else {
495 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
496 };
497 def.native_thread
498 };
499 let agent_status = {
500 let agent = agent.lock().await;
501 agent.status().clone()
502 };
503 if agent_status == AgentStatus::Init {
504 log::info!("Starting agent {}", agent_id);
505
506 if uses_native_thread {
507 let (tx, rx) = std::sync::mpsc::channel();
508
509 {
510 let mut agent_txs = self.agent_txs.lock().unwrap();
511 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
512 };
513
514 let agent_id = agent_id.to_string();
515 std::thread::spawn(async move || {
516 if let Err(e) = agent.lock().await.start().await {
517 log::error!("Failed to start agent {}: {}", agent_id, e);
518 }
519
520 while let Ok(message) = rx.recv() {
521 match message {
522 AgentMessage::Input { ctx, pin, value } => {
523 agent
524 .lock()
525 .await
526 .process(ctx, pin, value)
527 .await
528 .unwrap_or_else(|e| {
529 log::error!("Process Error {}: {}", agent_id, e);
530 });
531 }
532 AgentMessage::Config { configs } => {
533 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
534 log::error!("Config Error {}: {}", agent_id, e);
535 });
536 }
537 AgentMessage::Stop => {
538 break;
539 }
540 }
541 }
542 });
543 } else {
544 let (tx, mut rx) = mpsc::channel(32);
545
546 {
547 let mut agent_txs = self.agent_txs.lock().unwrap();
548 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
549 };
550
551 let agent_id = agent_id.to_string();
552 tokio::spawn(async move {
553 {
554 let mut agent_guard = agent.lock().await;
555 if let Err(e) = agent_guard.start().await {
556 log::error!("Failed to start agent {}: {}", agent_id, e);
557 }
558 }
559
560 while let Some(message) = rx.recv().await {
561 match message {
562 AgentMessage::Input { ctx, pin, value } => {
563 agent
564 .lock()
565 .await
566 .process(ctx, pin, value)
567 .await
568 .unwrap_or_else(|e| {
569 log::error!("Process Error {}: {}", agent_id, e);
570 });
571 }
572 AgentMessage::Config { configs } => {
573 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
574 log::error!("Config Error {}: {}", agent_id, e);
575 });
576 }
577 AgentMessage::Stop => {
578 rx.close();
579 return;
580 }
581 }
582 }
583 });
584 }
585 }
586 Ok(())
587 }
588
589 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
590 let agent = {
591 let agents = self.agents.lock().unwrap();
592 let Some(a) = agents.get(agent_id) else {
593 return Err(AgentError::AgentNotFound(agent_id.to_string()));
594 };
595 a.clone()
596 };
597
598 let agent_status = {
599 let agent = agent.lock().await;
600 agent.status().clone()
601 };
602 if agent_status == AgentStatus::Start {
603 log::info!("Stopping agent {}", agent_id);
604
605 {
606 let mut agent_txs = self.agent_txs.lock().unwrap();
607 if let Some(tx) = agent_txs.remove(agent_id) {
608 match tx {
609 AgentMessageSender::Sync(tx) => {
610 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
611 log::error!(
612 "Failed to send stop message to agent {}: {}",
613 agent_id,
614 e
615 );
616 });
617 }
618 AgentMessageSender::Async(tx) => {
619 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
620 log::error!(
621 "Failed to send stop message to agent {}: {}",
622 agent_id,
623 e
624 );
625 });
626 }
627 }
628 }
629 }
630
631 agent.lock().await.stop().await?;
632 }
633
634 Ok(())
635 }
636
637 pub async fn set_agent_configs(
638 &self,
639 agent_id: String,
640 configs: AgentConfigs,
641 ) -> Result<(), AgentError> {
642 let agent = {
643 let agents = self.agents.lock().unwrap();
644 let Some(a) = agents.get(&agent_id) else {
645 return Err(AgentError::AgentNotFound(agent_id.to_string()));
646 };
647 a.clone()
648 };
649
650 let agent_status = {
651 let agent = agent.lock().await;
652 agent.status().clone()
653 };
654 if agent_status == AgentStatus::Init {
655 agent.lock().await.set_configs(configs.clone())?;
656 } else if agent_status == AgentStatus::Start {
657 let tx = {
658 let agent_txs = self.agent_txs.lock().unwrap();
659 let Some(tx) = agent_txs.get(&agent_id) else {
660 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
661 };
662 tx.clone()
663 };
664 let message = AgentMessage::Config { configs };
665 match tx {
666 AgentMessageSender::Sync(tx) => {
667 tx.send(message).map_err(|_| {
668 AgentError::SendMessageFailed("Failed to send config message".to_string())
669 })?;
670 }
671 AgentMessageSender::Async(tx) => {
672 tx.send(message).await.map_err(|_| {
673 AgentError::SendMessageFailed("Failed to send config message".to_string())
674 })?;
675 }
676 }
677 }
678 Ok(())
679 }
680
681 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
682 let global_configs_map = self.global_configs_map.lock().unwrap();
683 global_configs_map.get(def_name).cloned()
684 }
685
686 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
687 let mut global_configs_map = self.global_configs_map.lock().unwrap();
688
689 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
690 global_configs_map.insert(def_name, configs);
691 return;
692 };
693
694 for (key, value) in configs {
695 existing_configs.set(key, value);
696 }
697 }
698
699 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
700 for (agent_name, new_configs) in new_configs_map {
701 self.set_global_configs(agent_name, new_configs);
702 }
703 }
704
705 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
706 let global_configs_map = self.global_configs_map.lock().unwrap();
707 global_configs_map.clone()
708 }
709
710 pub(crate) async fn agent_input(
711 &self,
712 agent_id: String,
713 ctx: AgentContext,
714 pin: String,
715 value: AgentValue,
716 ) -> Result<(), AgentError> {
717 let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
718 let agents = self.agents.lock().unwrap();
719 let Some(a) = agents.get(&agent_id) else {
720 return Err(AgentError::AgentNotFound(agent_id.to_string()));
721 };
722 a.clone()
723 };
724
725 let agent_status = {
726 let agent = agent.lock().await;
727 agent.status().clone()
728 };
729 if agent_status != AgentStatus::Start {
730 return Ok(());
731 }
732
733 if pin.starts_with("config:") {
734 let config_key = pin[7..].to_string();
735 let mut agent = agent.lock().await;
736 agent.set_config(config_key.clone(), value.clone())?;
737 return Ok(());
738 }
739
740 let message = AgentMessage::Input {
741 ctx,
742 pin: pin.clone(),
743 value,
744 };
745
746 let tx = {
747 let agent_txs = self.agent_txs.lock().unwrap();
748 let Some(tx) = agent_txs.get(&agent_id) else {
749 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
750 };
751 tx.clone()
752 };
753 match tx {
754 AgentMessageSender::Sync(tx) => {
755 tx.send(message).map_err(|_| {
756 AgentError::SendMessageFailed("Failed to send input message".to_string())
757 })?;
758 }
759 AgentMessageSender::Async(tx) => {
760 tx.send(message).await.map_err(|_| {
761 AgentError::SendMessageFailed("Failed to send input message".to_string())
762 })?;
763 }
764 }
765 self.emit_agent_input(agent_id.to_string(), pin);
766
767 Ok(())
768 }
769
770 pub async fn send_agent_out(
771 &self,
772 agent_id: String,
773 ctx: AgentContext,
774 pin: String,
775 value: AgentValue,
776 ) -> Result<(), AgentError> {
777 message::send_agent_out(self, agent_id, ctx, pin, value).await
778 }
779
780 pub fn try_send_agent_out(
781 &self,
782 agent_id: String,
783 ctx: AgentContext,
784 pin: String,
785 value: AgentValue,
786 ) -> Result<(), AgentError> {
787 message::try_send_agent_out(self, agent_id, ctx, pin, value)
788 }
789
790 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
791 self.try_send_board_out(name, AgentContext::new(), value)
792 }
793
794 pub(crate) fn try_send_board_out(
795 &self,
796 name: String,
797 ctx: AgentContext,
798 value: AgentValue,
799 ) -> Result<(), AgentError> {
800 message::try_send_board_out(self, name, ctx, value)
801 }
802
803 fn spawn_message_loop(&self) -> Result<(), AgentError> {
804 let (tx, mut rx) = mpsc::channel(4096);
806 {
807 let mut tx_lock = self.tx.lock().unwrap();
808 *tx_lock = Some(tx);
809 }
810
811 let askit = self.clone();
813 tokio::spawn(async move {
814 while let Some(message) = rx.recv().await {
815 use AgentEventMessage::*;
816
817 match message {
818 AgentOut {
819 agent,
820 ctx,
821 pin,
822 value,
823 } => {
824 message::agent_out(&askit, agent, ctx, pin, value).await;
825 }
826 BoardOut { name, ctx, value } => {
827 message::board_out(&askit, name, ctx, value).await;
828 }
829 }
830 }
831 });
832
833 Ok(())
834 }
835
836 async fn start_agent_flows(&self) -> Result<(), AgentError> {
837 let agent_flow_names;
838 {
839 let agent_flows = self.flows.lock().unwrap();
840 agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
841 }
842 for name in agent_flow_names {
843 self.start_agent_flow(&name).await.unwrap_or_else(|e| {
844 log::error!("Failed to start agent flow: {}", e);
845 });
846 }
847 Ok(())
848 }
849
850 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
851 let mut observers = self.observers.lock().unwrap();
852 let observer_id = new_observer_id();
853 observers.insert(observer_id, observer);
854 observer_id
855 }
856
857 pub fn unsubscribe(&self, observer_id: usize) {
858 let mut observers = self.observers.lock().unwrap();
859 observers.remove(&observer_id);
860 }
861
862 pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
863 self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
864 }
865
866 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
867 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
868 }
869
870 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
871 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
872 }
873
874 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
875 self.notify_observers(ASKitEvent::Board(name, value));
876 }
877
878 fn notify_observers(&self, event: ASKitEvent) {
879 let observers = self.observers.lock().unwrap();
880 for (_id, observer) in observers.iter() {
881 observer.notify(&event);
882 }
883 }
884}
885
886#[derive(Clone, Debug)]
887pub enum ASKitEvent {
888 AgentDisplay(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), Board(String, AgentValue), }
893
894pub trait ASKitObserver {
895 fn notify(&self, event: &ASKitEvent);
896}
897
898static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
899
900fn new_observer_id() -> usize {
901 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
902}
903
904#[derive(Clone)]
907pub enum AgentMessageSender {
908 Sync(std::sync::mpsc::Sender<AgentMessage>),
909 Async(mpsc::Sender<AgentMessage>),
910}