1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentDefaultConfigs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::value::AgentValue;
16
17#[derive(Clone)]
18pub struct ASKit {
19 pub(crate) agents:
21 Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23 pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26 pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29 pub(crate) board_value: Arc<Mutex<HashMap<String, AgentValue>>>,
31
32 pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38 pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41 pub(crate) global_configs_map: Arc<Mutex<HashMap<String, AgentConfigs>>>,
43
44 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47 pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52 pub fn new() -> Self {
53 Self {
54 agents: Default::default(),
55 agent_txs: Default::default(),
56 board_out_agents: Default::default(),
57 board_value: Default::default(),
58 edges: Default::default(),
59 defs: Default::default(),
60 flows: Default::default(),
61 global_configs_map: Default::default(),
62 tx: Arc::new(Mutex::new(None)),
63 observers: Default::default(),
64 }
65 }
66
67 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68 self.tx
69 .lock()
70 .unwrap()
71 .clone()
72 .ok_or(AgentError::TxNotInitialized)
73 }
74
75 pub fn init() -> Result<Self, AgentError> {
76 let askit = Self::new();
77 askit.register_agents();
78 Ok(askit)
79 }
80
81 fn register_agents(&self) {
82 registry::register_inventory_agents(self);
83 }
84
85 pub async fn ready(&self) -> Result<(), AgentError> {
86 self.spawn_message_loop()?;
87 self.start_agent_flows().await?;
88 Ok(())
89 }
90
91 pub fn quit(&self) {
92 let mut tx_lock = self.tx.lock().unwrap();
93 *tx_lock = None;
94 }
95
96 pub fn register_agent(&self, def: AgentDefinition) {
97 let def_name = def.name.clone();
98 let def_global_configs = def.global_configs.clone();
99
100 let mut defs = self.defs.lock().unwrap();
101 defs.insert(def.name.clone(), def);
102
103 if let Some(def_global_configs) = def_global_configs {
105 let mut new_configs = AgentConfigs::default();
106 for (key, config_entry) in def_global_configs.iter() {
107 new_configs.set(key.clone(), config_entry.value.clone());
108 }
109 self.set_global_configs(def_name, new_configs);
110 }
111 }
112
113 pub fn get_agent_definitions(&self) -> AgentDefinitions {
114 let defs = self.defs.lock().unwrap();
115 defs.clone()
116 }
117
118 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119 let defs = self.defs.lock().unwrap();
120 defs.get(def_name).cloned()
121 }
122
123 pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentDefaultConfigs> {
124 let defs = self.defs.lock().unwrap();
125 let Some(def) = defs.get(def_name) else {
126 return None;
127 };
128 def.default_configs.clone()
129 }
130
131 pub fn get_agent_flows(&self) -> AgentFlows {
134 let flows = self.flows.lock().unwrap();
135 flows.clone()
136 }
137
138 pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139 if !Self::is_valid_flow_name(name) {
140 return Err(AgentError::InvalidFlowName(name.into()));
141 }
142
143 let new_name = self.unique_flow_name(name);
144 let mut flows = self.flows.lock().unwrap();
145 let flow = AgentFlow::new(new_name.clone());
146 flows.insert(flow.id().to_string(), flow.clone());
147 Ok(flow)
148 }
149
150 pub fn rename_agent_flow(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
151 if !Self::is_valid_flow_name(new_name) {
152 return Err(AgentError::InvalidFlowName(new_name.into()));
153 }
154
155 let new_name = self.unique_flow_name(new_name);
157
158 let mut flows = self.flows.lock().unwrap();
159
160 let Some(mut flow) = flows.remove(id) else {
162 return Err(AgentError::RenameFlowFailed(id.into()));
163 };
164
165 flow.set_name(new_name.clone());
167 flows.insert(flow.id().to_string(), flow);
168 Ok(new_name)
169 }
170
171 fn is_valid_flow_name(new_name: &str) -> bool {
172 if new_name.trim().is_empty() {
174 return false;
175 }
176
177 if new_name.contains('/') {
179 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181 return false;
182 }
183 if new_name
185 .split('/')
186 .any(|segment| segment == "." || segment == "..")
187 {
188 return false;
189 }
190 }
191
192 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194 for c in invalid_chars {
195 if new_name.contains(c) {
196 return false;
197 }
198 }
199
200 true
201 }
202
203 pub fn unique_flow_name(&self, name: &str) -> String {
204 let mut new_name = name.trim().to_string();
205 let mut i = 2;
206 let flows = self.flows.lock().unwrap();
207 while flows.values().any(|flow| flow.name() == new_name) {
208 new_name = format!("{}{}", name, i);
209 i += 1;
210 }
211 new_name
212 }
213
214 pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215 let id = agent_flow.id();
216
217 {
219 let mut flows = self.flows.lock().unwrap();
220 if flows.contains_key(id) {
221 return Err(AgentError::DuplicateId(id.into()));
222 }
223 flows.insert(id.to_string(), agent_flow.clone());
224 }
225
226 for node in agent_flow.nodes().iter() {
228 self.add_agent(id, node).unwrap_or_else(|e| {
229 log::error!("Failed to add_agent_node {}: {}", node.id, e);
230 });
231 }
232
233 for edge in agent_flow.edges().iter() {
235 self.add_edge(edge).unwrap_or_else(|e| {
236 log::error!("Failed to add_edge {}: {}", edge.source, e);
237 });
238 }
239
240 Ok(())
241 }
242
243 pub async fn remove_agent_flow(&self, id: &str) -> Result<(), AgentError> {
244 let flow = {
245 let mut flows = self.flows.lock().unwrap();
246 let Some(flow) = flows.remove(id) else {
247 return Err(AgentError::FlowNotFound(id.to_string()));
248 };
249 flow.clone()
250 };
251
252 flow.stop(self).await?;
253
254 for node in flow.nodes() {
256 self.remove_agent(&node.id).await?;
257 }
258 for edge in flow.edges() {
259 self.remove_edge(edge);
260 }
261
262 Ok(())
263 }
264
265 pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266 let flow_id = flow.id();
267
268 let mut flows = self.flows.lock().unwrap();
269 flows.insert(flow_id.to_string(), flow);
270 Ok(())
271 }
272
273 pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
274 let def = self
275 .get_agent_definition(def_name)
276 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
277 AgentFlowNode::new(&def)
278 }
279
280 pub fn add_agent_flow_node(
281 &self,
282 flow_id: &str,
283 node: &AgentFlowNode,
284 ) -> Result<(), AgentError> {
285 let mut flows = self.flows.lock().unwrap();
286 let Some(flow) = flows.get_mut(flow_id) else {
287 return Err(AgentError::FlowNotFound(flow_id.to_string()));
288 };
289 flow.add_node(node.clone());
290 self.add_agent(flow_id, node)?;
291 Ok(())
292 }
293
294 pub(crate) fn add_agent(&self, flow_id: &str, node: &AgentFlowNode) -> Result<(), AgentError> {
295 let mut agents = self.agents.lock().unwrap();
296 if agents.contains_key(&node.id) {
297 return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
298 }
299 if let Ok(mut agent) = agent_new(
300 self.clone(),
301 node.id.clone(),
302 &node.def_name,
303 node.configs.clone(),
304 ) {
305 agent.set_flow_id(flow_id.to_string());
306 agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
307 } else {
308 return Err(AgentError::AgentCreationFailed(node.id.to_string()));
309 }
310 Ok(())
311 }
312
313 pub fn add_agent_flow_edge(
314 &self,
315 flow_id: &str,
316 edge: &AgentFlowEdge,
317 ) -> Result<(), AgentError> {
318 let mut flows = self.flows.lock().unwrap();
319 let Some(flow) = flows.get_mut(flow_id) else {
320 return Err(AgentError::FlowNotFound(flow_id.to_string()));
321 };
322 flow.add_edge(edge.clone());
323 self.add_edge(edge)?;
324 Ok(())
325 }
326
327 pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
328 {
330 let agents = self.agents.lock().unwrap();
331 if !agents.contains_key(&edge.source) {
332 return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
333 }
334 }
335
336 if edge.source_handle.is_empty() {
338 return Err(AgentError::EmptySourceHandle);
339 }
340 if edge.target_handle.is_empty() {
341 return Err(AgentError::EmptyTargetHandle);
342 }
343
344 let mut edges = self.edges.lock().unwrap();
345 if let Some(targets) = edges.get_mut(&edge.source) {
346 if targets
347 .iter()
348 .any(|(target, source_handle, target_handle)| {
349 *target == edge.target
350 && *source_handle == edge.source_handle
351 && *target_handle == edge.target_handle
352 })
353 {
354 return Err(AgentError::EdgeAlreadyExists);
355 }
356 targets.push((
357 edge.target.clone(),
358 edge.source_handle.clone(),
359 edge.target_handle.clone(),
360 ));
361 } else {
362 edges.insert(
363 edge.source.clone(),
364 vec![(
365 edge.target.clone(),
366 edge.source_handle.clone(),
367 edge.target_handle.clone(),
368 )],
369 );
370 }
371 Ok(())
372 }
373
374 pub async fn remove_agent_flow_node(
375 &self,
376 flow_id: &str,
377 node_id: &str,
378 ) -> Result<(), AgentError> {
379 {
380 let mut flows = self.flows.lock().unwrap();
381 let Some(flow) = flows.get_mut(flow_id) else {
382 return Err(AgentError::FlowNotFound(flow_id.to_string()));
383 };
384 flow.remove_node(node_id);
385 }
386 self.remove_agent(node_id).await?;
387 Ok(())
388 }
389
390 pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
391 self.stop_agent(agent_id).await?;
392
393 {
395 let mut edges = self.edges.lock().unwrap();
396 let mut sources_to_remove = Vec::new();
397 for (source, targets) in edges.iter_mut() {
398 targets.retain(|(target, _, _)| target != agent_id);
399 if targets.is_empty() {
400 sources_to_remove.push(source.clone());
401 }
402 }
403 for source in sources_to_remove {
404 edges.remove(&source);
405 }
406 edges.remove(agent_id);
407 }
408
409 {
411 let mut agents = self.agents.lock().unwrap();
412 agents.remove(agent_id);
413 }
414
415 Ok(())
416 }
417
418 pub fn remove_agent_flow_edge(&self, flow_id: &str, edge_id: &str) -> Result<(), AgentError> {
419 let mut flows = self.flows.lock().unwrap();
420 let Some(flow) = flows.get_mut(flow_id) else {
421 return Err(AgentError::FlowNotFound(flow_id.to_string()));
422 };
423 let Some(edge) = flow.remove_edge(edge_id) else {
424 return Err(AgentError::EdgeNotFound(edge_id.to_string()));
425 };
426 self.remove_edge(&edge);
427 Ok(())
428 }
429
430 pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
431 let mut edges = self.edges.lock().unwrap();
432 if let Some(targets) = edges.get_mut(&edge.source) {
433 targets.retain(|(target, source_handle, target_handle)| {
434 *target != edge.target
435 || *source_handle != edge.source_handle
436 || *target_handle != edge.target_handle
437 });
438 if targets.is_empty() {
439 edges.remove(&edge.source);
440 }
441 }
442 }
443
444 pub fn copy_sub_flow(
445 &self,
446 nodes: &Vec<AgentFlowNode>,
447 edges: &Vec<AgentFlowEdge>,
448 ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
449 flow::copy_sub_flow(nodes, edges)
450 }
451
452 pub async fn start_agent_flow(&self, id: &str) -> Result<(), AgentError> {
453 let flow = {
454 let flows = self.flows.lock().unwrap();
455 let Some(flow) = flows.get(id) else {
456 return Err(AgentError::FlowNotFound(id.to_string()));
457 };
458 flow.clone()
459 };
460 flow.start(self).await?;
461 Ok(())
462 }
463
464 pub async fn stop_agent_flow(&self, id: &str) -> Result<(), AgentError> {
465 let flow = {
466 let flows = self.flows.lock().unwrap();
467 let Some(flow) = flows.get(id) else {
468 return Err(AgentError::FlowNotFound(id.to_string()));
469 };
470 flow.clone()
471 };
472 flow.stop(self).await?;
473 Ok(())
474 }
475
476 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
477 let agent = {
478 let agents = self.agents.lock().unwrap();
479 let Some(a) = agents.get(agent_id) else {
480 return Err(AgentError::AgentNotFound(agent_id.to_string()));
481 };
482 a.clone()
483 };
484 let def_name = {
485 let agent = agent.lock().await;
486 agent.def_name().to_string()
487 };
488 let uses_native_thread = {
489 let defs = self.defs.lock().unwrap();
490 let Some(def) = defs.get(&def_name) else {
491 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
492 };
493 def.native_thread
494 };
495 let agent_status = {
496 let agent = agent.lock().await;
497 agent.status().clone()
498 };
499 if agent_status == AgentStatus::Init {
500 log::info!("Starting agent {}", agent_id);
501
502 if uses_native_thread {
503 let (tx, rx) = std::sync::mpsc::channel();
504
505 {
506 let mut agent_txs = self.agent_txs.lock().unwrap();
507 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
508 };
509
510 let agent_id = agent_id.to_string();
511 std::thread::spawn(async move || {
512 if let Err(e) = agent.lock().await.start().await {
513 log::error!("Failed to start agent {}: {}", agent_id, e);
514 }
515
516 while let Ok(message) = rx.recv() {
517 match message {
518 AgentMessage::Input { ctx, pin, value } => {
519 agent
520 .lock()
521 .await
522 .process(ctx, pin, value)
523 .await
524 .unwrap_or_else(|e| {
525 log::error!("Process Error {}: {}", agent_id, e);
526 });
527 }
528 AgentMessage::Config { configs } => {
529 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
530 log::error!("Config Error {}: {}", agent_id, e);
531 });
532 }
533 AgentMessage::Stop => {
534 break;
535 }
536 }
537 }
538 });
539 } else {
540 let (tx, mut rx) = mpsc::channel(32);
541
542 {
543 let mut agent_txs = self.agent_txs.lock().unwrap();
544 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
545 };
546
547 let agent_id = agent_id.to_string();
548 tokio::spawn(async move {
549 {
550 let mut agent_guard = agent.lock().await;
551 if let Err(e) = agent_guard.start().await {
552 log::error!("Failed to start agent {}: {}", agent_id, e);
553 }
554 }
555
556 while let Some(message) = rx.recv().await {
557 match message {
558 AgentMessage::Input { ctx, pin, value } => {
559 agent
560 .lock()
561 .await
562 .process(ctx, pin, value)
563 .await
564 .unwrap_or_else(|e| {
565 log::error!("Process Error {}: {}", agent_id, e);
566 });
567 }
568 AgentMessage::Config { configs } => {
569 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
570 log::error!("Config Error {}: {}", agent_id, e);
571 });
572 }
573 AgentMessage::Stop => {
574 rx.close();
575 return;
576 }
577 }
578 }
579 });
580 }
581 }
582 Ok(())
583 }
584
585 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
586 let agent = {
587 let agents = self.agents.lock().unwrap();
588 let Some(a) = agents.get(agent_id) else {
589 return Err(AgentError::AgentNotFound(agent_id.to_string()));
590 };
591 a.clone()
592 };
593
594 let agent_status = {
595 let agent = agent.lock().await;
596 agent.status().clone()
597 };
598 if agent_status == AgentStatus::Start {
599 log::info!("Stopping agent {}", agent_id);
600
601 {
602 let mut agent_txs = self.agent_txs.lock().unwrap();
603 if let Some(tx) = agent_txs.remove(agent_id) {
604 match tx {
605 AgentMessageSender::Sync(tx) => {
606 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
607 log::error!(
608 "Failed to send stop message to agent {}: {}",
609 agent_id,
610 e
611 );
612 });
613 }
614 AgentMessageSender::Async(tx) => {
615 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
616 log::error!(
617 "Failed to send stop message to agent {}: {}",
618 agent_id,
619 e
620 );
621 });
622 }
623 }
624 }
625 }
626
627 agent.lock().await.stop().await?;
628 }
629
630 Ok(())
631 }
632
633 pub async fn set_agent_configs(
634 &self,
635 agent_id: String,
636 configs: AgentConfigs,
637 ) -> Result<(), AgentError> {
638 let agent = {
639 let agents = self.agents.lock().unwrap();
640 let Some(a) = agents.get(&agent_id) else {
641 return Err(AgentError::AgentNotFound(agent_id.to_string()));
642 };
643 a.clone()
644 };
645
646 let agent_status = {
647 let agent = agent.lock().await;
648 agent.status().clone()
649 };
650 if agent_status == AgentStatus::Init {
651 agent.lock().await.set_configs(configs.clone())?;
652 } else if agent_status == AgentStatus::Start {
653 let tx = {
654 let agent_txs = self.agent_txs.lock().unwrap();
655 let Some(tx) = agent_txs.get(&agent_id) else {
656 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
657 };
658 tx.clone()
659 };
660 let message = AgentMessage::Config { configs };
661 match tx {
662 AgentMessageSender::Sync(tx) => {
663 tx.send(message).map_err(|_| {
664 AgentError::SendMessageFailed("Failed to send config message".to_string())
665 })?;
666 }
667 AgentMessageSender::Async(tx) => {
668 tx.send(message).await.map_err(|_| {
669 AgentError::SendMessageFailed("Failed to send config message".to_string())
670 })?;
671 }
672 }
673 }
674 Ok(())
675 }
676
677 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
678 let global_configs_map = self.global_configs_map.lock().unwrap();
679 global_configs_map.get(def_name).cloned()
680 }
681
682 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
683 let mut global_configs_map = self.global_configs_map.lock().unwrap();
684
685 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
686 global_configs_map.insert(def_name, configs);
687 return;
688 };
689
690 for (key, value) in configs {
691 existing_configs.set(key, value);
692 }
693 }
694
695 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
696 for (agent_name, new_configs) in new_configs_map {
697 self.set_global_configs(agent_name, new_configs);
698 }
699 }
700
701 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
702 let global_configs_map = self.global_configs_map.lock().unwrap();
703 global_configs_map.clone()
704 }
705
706 pub(crate) async fn agent_input(
707 &self,
708 agent_id: String,
709 ctx: AgentContext,
710 pin: String,
711 value: AgentValue,
712 ) -> Result<(), AgentError> {
713 let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
714 let agents = self.agents.lock().unwrap();
715 let Some(a) = agents.get(&agent_id) else {
716 return Err(AgentError::AgentNotFound(agent_id.to_string()));
717 };
718 a.clone()
719 };
720
721 let agent_status = {
722 let agent = agent.lock().await;
723 agent.status().clone()
724 };
725 if agent_status != AgentStatus::Start {
726 return Ok(());
727 }
728
729 if pin.starts_with("config:") {
730 let config_key = pin[7..].to_string();
731 let mut agent = agent.lock().await;
732 agent.set_config(config_key.clone(), value.clone())?;
733 return Ok(());
734 }
735
736 let message = AgentMessage::Input {
737 ctx,
738 pin: pin.clone(),
739 value,
740 };
741
742 let tx = {
743 let agent_txs = self.agent_txs.lock().unwrap();
744 let Some(tx) = agent_txs.get(&agent_id) else {
745 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
746 };
747 tx.clone()
748 };
749 match tx {
750 AgentMessageSender::Sync(tx) => {
751 tx.send(message).map_err(|_| {
752 AgentError::SendMessageFailed("Failed to send input message".to_string())
753 })?;
754 }
755 AgentMessageSender::Async(tx) => {
756 tx.send(message).await.map_err(|_| {
757 AgentError::SendMessageFailed("Failed to send input message".to_string())
758 })?;
759 }
760 }
761 self.emit_agent_input(agent_id.to_string(), pin);
762
763 Ok(())
764 }
765
766 pub async fn send_agent_out(
767 &self,
768 agent_id: String,
769 ctx: AgentContext,
770 pin: String,
771 value: AgentValue,
772 ) -> Result<(), AgentError> {
773 message::send_agent_out(self, agent_id, ctx, pin, value).await
774 }
775
776 pub fn try_send_agent_out(
777 &self,
778 agent_id: String,
779 ctx: AgentContext,
780 pin: String,
781 value: AgentValue,
782 ) -> Result<(), AgentError> {
783 message::try_send_agent_out(self, agent_id, ctx, pin, value)
784 }
785
786 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
787 self.try_send_board_out(name, AgentContext::new(), value)
788 }
789
790 pub(crate) fn try_send_board_out(
791 &self,
792 name: String,
793 ctx: AgentContext,
794 value: AgentValue,
795 ) -> Result<(), AgentError> {
796 message::try_send_board_out(self, name, ctx, value)
797 }
798
799 fn spawn_message_loop(&self) -> Result<(), AgentError> {
800 let (tx, mut rx) = mpsc::channel(4096);
802 {
803 let mut tx_lock = self.tx.lock().unwrap();
804 *tx_lock = Some(tx);
805 }
806
807 let askit = self.clone();
809 tokio::spawn(async move {
810 while let Some(message) = rx.recv().await {
811 use AgentEventMessage::*;
812
813 match message {
814 AgentOut {
815 agent,
816 ctx,
817 pin,
818 value,
819 } => {
820 message::agent_out(&askit, agent, ctx, pin, value).await;
821 }
822 BoardOut { name, ctx, value } => {
823 message::board_out(&askit, name, ctx, value).await;
824 }
825 }
826 }
827 });
828
829 Ok(())
830 }
831
832 async fn start_agent_flows(&self) -> Result<(), AgentError> {
833 let agent_flow_ids;
834 {
835 let agent_flows = self.flows.lock().unwrap();
836 agent_flow_ids = agent_flows.keys().cloned().collect::<Vec<_>>();
837 }
838 for id in agent_flow_ids {
839 self.start_agent_flow(&id).await.unwrap_or_else(|e| {
840 log::error!("Failed to start agent flow: {}", e);
841 });
842 }
843 Ok(())
844 }
845
846 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
847 let mut observers = self.observers.lock().unwrap();
848 let observer_id = new_observer_id();
849 observers.insert(observer_id, observer);
850 observer_id
851 }
852
853 pub fn unsubscribe(&self, observer_id: usize) {
854 let mut observers = self.observers.lock().unwrap();
855 observers.remove(&observer_id);
856 }
857
858 pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
859 self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
860 }
861
862 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
863 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
864 }
865
866 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
867 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
868 }
869
870 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
871 if name.starts_with('%') {
873 return;
874 }
875 self.notify_observers(ASKitEvent::Board(name, value));
876 }
877
878 fn notify_observers(&self, event: ASKitEvent) {
879 let observers = self.observers.lock().unwrap();
880 for (_id, observer) in observers.iter() {
881 observer.notify(&event);
882 }
883 }
884}
885
886#[derive(Clone, Debug)]
887pub enum ASKitEvent {
888 AgentDisplay(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), Board(String, AgentValue), }
893
894pub trait ASKitObserver {
895 fn notify(&self, event: &ASKitEvent);
896}
897
898static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
899
900fn new_observer_id() -> usize {
901 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
902}
903
904#[derive(Clone)]
907pub enum AgentMessageSender {
908 Sync(std::sync::mpsc::Sender<AgentMessage>),
909 Async(mpsc::Sender<AgentMessage>),
910}