1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentDefaultConfigs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::value::AgentValue;
16
17const MESSAGE_LIMIT: usize = 1024;
18
19#[derive(Clone)]
20pub struct ASKit {
21 pub(crate) agents: Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
23
24 pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
26
27 pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
29
30 pub(crate) board_value: Arc<Mutex<HashMap<String, AgentValue>>>,
32
33 pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
35
36 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39 pub(crate) flows: Arc<Mutex<AgentFlows>>,
41
42 pub(crate) global_configs_map: Arc<Mutex<HashMap<String, AgentConfigs>>>,
44
45 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48 pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53 pub fn new() -> Self {
54 Self {
55 agents: Default::default(),
56 agent_txs: Default::default(),
57 board_out_agents: Default::default(),
58 board_value: Default::default(),
59 edges: Default::default(),
60 defs: Default::default(),
61 flows: Default::default(),
62 global_configs_map: Default::default(),
63 tx: Arc::new(Mutex::new(None)),
64 observers: Default::default(),
65 }
66 }
67
68 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69 self.tx
70 .lock()
71 .unwrap()
72 .clone()
73 .ok_or(AgentError::TxNotInitialized)
74 }
75
76 pub fn init() -> Result<Self, AgentError> {
77 let askit = Self::new();
78 askit.register_agents();
79 Ok(askit)
80 }
81
82 fn register_agents(&self) {
83 registry::register_inventory_agents(self);
84 }
85
86 pub async fn ready(&self) -> Result<(), AgentError> {
87 self.spawn_message_loop().await?;
88 self.start_agent_flows().await?;
89 Ok(())
90 }
91
92 pub fn quit(&self) {
93 let mut tx_lock = self.tx.lock().unwrap();
94 *tx_lock = None;
95 }
96
97 pub fn register_agent(&self, def: AgentDefinition) {
98 let def_name = def.name.clone();
99 let def_global_configs = def.global_configs.clone();
100
101 let mut defs = self.defs.lock().unwrap();
102 defs.insert(def.name.clone(), def);
103
104 if let Some(def_global_configs) = def_global_configs {
106 let mut new_configs = AgentConfigs::default();
107 for (key, config_entry) in def_global_configs.iter() {
108 new_configs.set(key.clone(), config_entry.value.clone());
109 }
110 self.set_global_configs(def_name, new_configs);
111 }
112 }
113
114 pub fn get_agent_definitions(&self) -> AgentDefinitions {
115 let defs = self.defs.lock().unwrap();
116 defs.clone()
117 }
118
119 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
120 let defs = self.defs.lock().unwrap();
121 defs.get(def_name).cloned()
122 }
123
124 pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentDefaultConfigs> {
125 let defs = self.defs.lock().unwrap();
126 let Some(def) = defs.get(def_name) else {
127 return None;
128 };
129 def.default_configs.clone()
130 }
131
132 pub fn get_agent_flows(&self) -> AgentFlows {
135 let flows = self.flows.lock().unwrap();
136 flows.clone()
137 }
138
139 pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
140 if !Self::is_valid_flow_name(name) {
141 return Err(AgentError::InvalidFlowName(name.into()));
142 }
143
144 let new_name = self.unique_flow_name(name);
145 let mut flows = self.flows.lock().unwrap();
146 let flow = AgentFlow::new(new_name.clone());
147 flows.insert(flow.id().to_string(), flow.clone());
148 Ok(flow)
149 }
150
151 pub fn rename_agent_flow(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
152 if !Self::is_valid_flow_name(new_name) {
153 return Err(AgentError::InvalidFlowName(new_name.into()));
154 }
155
156 let new_name = self.unique_flow_name(new_name);
158
159 let mut flows = self.flows.lock().unwrap();
160
161 let Some(mut flow) = flows.remove(id) else {
163 return Err(AgentError::RenameFlowFailed(id.into()));
164 };
165
166 flow.set_name(new_name.clone());
168 flows.insert(flow.id().to_string(), flow);
169 Ok(new_name)
170 }
171
172 fn is_valid_flow_name(new_name: &str) -> bool {
173 if new_name.trim().is_empty() {
175 return false;
176 }
177
178 if new_name.contains('/') {
180 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
182 return false;
183 }
184 if new_name
186 .split('/')
187 .any(|segment| segment == "." || segment == "..")
188 {
189 return false;
190 }
191 }
192
193 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
195 for c in invalid_chars {
196 if new_name.contains(c) {
197 return false;
198 }
199 }
200
201 true
202 }
203
204 pub fn unique_flow_name(&self, name: &str) -> String {
205 let mut new_name = name.trim().to_string();
206 let mut i = 2;
207 let flows = self.flows.lock().unwrap();
208 while flows.values().any(|flow| flow.name() == new_name) {
209 new_name = format!("{}{}", name, i);
210 i += 1;
211 }
212 new_name
213 }
214
215 pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
216 let id = agent_flow.id();
217
218 {
220 let mut flows = self.flows.lock().unwrap();
221 if flows.contains_key(id) {
222 return Err(AgentError::DuplicateId(id.into()));
223 }
224 flows.insert(id.to_string(), agent_flow.clone());
225 }
226
227 for node in agent_flow.nodes().iter() {
229 if let Err(e) = self.add_agent(id, node) {
230 log::error!("Failed to add_agent_node {}: {}", node.id, e);
231 }
232 }
233
234 for edge in agent_flow.edges().iter() {
236 self.add_edge(edge).unwrap_or_else(|e| {
237 log::error!("Failed to add_edge {}: {}", edge.source, e);
238 });
239 }
240
241 Ok(())
242 }
243
244 pub async fn remove_agent_flow(&self, id: &str) -> Result<(), AgentError> {
245 let flow = {
246 let mut flows = self.flows.lock().unwrap();
247 let Some(flow) = flows.remove(id) else {
248 return Err(AgentError::FlowNotFound(id.to_string()));
249 };
250 flow.clone()
251 };
252
253 flow.stop(self).await?;
254
255 for node in flow.nodes() {
257 self.remove_agent(&node.id).await?;
258 }
259 for edge in flow.edges() {
260 self.remove_edge(edge);
261 }
262
263 Ok(())
264 }
265
266 pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
267 let flow_id = flow.id();
268
269 let mut flows = self.flows.lock().unwrap();
270 flows.insert(flow_id.to_string(), flow);
271 Ok(())
272 }
273
274 pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
275 let def = self
276 .get_agent_definition(def_name)
277 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
278 AgentFlowNode::new(&def)
279 }
280
281 pub fn add_agent_flow_node(
282 &self,
283 flow_id: &str,
284 node: &AgentFlowNode,
285 ) -> Result<(), AgentError> {
286 let mut flows = self.flows.lock().unwrap();
287 let Some(flow) = flows.get_mut(flow_id) else {
288 return Err(AgentError::FlowNotFound(flow_id.to_string()));
289 };
290 flow.add_node(node.clone());
291 self.add_agent(flow_id, node)
292 }
293
294 pub(crate) fn add_agent(&self, flow_id: &str, node: &AgentFlowNode) -> Result<(), AgentError> {
295 let mut agents = self.agents.lock().unwrap();
296 if agents.contains_key(&node.id) {
297 return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
298 }
299 let mut agent = agent_new(
300 self.clone(),
301 node.id.clone(),
302 &node.def_name,
303 node.configs.clone(),
304 )?;
305 agent.set_flow_id(flow_id.to_string());
306 agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
307 Ok(())
308 }
309
310 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
311 let agents = self.agents.lock().unwrap();
312 agents.get(agent_id).cloned()
313 }
314
315 pub fn add_agent_flow_edge(
316 &self,
317 flow_id: &str,
318 edge: &AgentFlowEdge,
319 ) -> Result<(), AgentError> {
320 let mut flows = self.flows.lock().unwrap();
321 let Some(flow) = flows.get_mut(flow_id) else {
322 return Err(AgentError::FlowNotFound(flow_id.to_string()));
323 };
324 flow.add_edge(edge.clone());
325 self.add_edge(edge)?;
326 Ok(())
327 }
328
329 pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
330 {
332 let agents = self.agents.lock().unwrap();
333 if !agents.contains_key(&edge.source) {
334 return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
335 }
336 }
337
338 if edge.source_handle.is_empty() {
340 return Err(AgentError::EmptySourceHandle);
341 }
342 if edge.target_handle.is_empty() {
343 return Err(AgentError::EmptyTargetHandle);
344 }
345
346 let mut edges = self.edges.lock().unwrap();
347 if let Some(targets) = edges.get_mut(&edge.source) {
348 if targets
349 .iter()
350 .any(|(target, source_handle, target_handle)| {
351 *target == edge.target
352 && *source_handle == edge.source_handle
353 && *target_handle == edge.target_handle
354 })
355 {
356 return Err(AgentError::EdgeAlreadyExists);
357 }
358 targets.push((
359 edge.target.clone(),
360 edge.source_handle.clone(),
361 edge.target_handle.clone(),
362 ));
363 } else {
364 edges.insert(
365 edge.source.clone(),
366 vec![(
367 edge.target.clone(),
368 edge.source_handle.clone(),
369 edge.target_handle.clone(),
370 )],
371 );
372 }
373 Ok(())
374 }
375
376 pub async fn remove_agent_flow_node(
377 &self,
378 flow_id: &str,
379 node_id: &str,
380 ) -> Result<(), AgentError> {
381 {
382 let mut flows = self.flows.lock().unwrap();
383 let Some(flow) = flows.get_mut(flow_id) else {
384 return Err(AgentError::FlowNotFound(flow_id.to_string()));
385 };
386 flow.remove_node(node_id);
387 }
388 self.remove_agent(node_id).await?;
389 Ok(())
390 }
391
392 pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
393 self.stop_agent(agent_id).await?;
394
395 {
397 let mut edges = self.edges.lock().unwrap();
398 let mut sources_to_remove = Vec::new();
399 for (source, targets) in edges.iter_mut() {
400 targets.retain(|(target, _, _)| target != agent_id);
401 if targets.is_empty() {
402 sources_to_remove.push(source.clone());
403 }
404 }
405 for source in sources_to_remove {
406 edges.remove(&source);
407 }
408 edges.remove(agent_id);
409 }
410
411 {
413 let mut agents = self.agents.lock().unwrap();
414 agents.remove(agent_id);
415 }
416
417 Ok(())
418 }
419
420 pub fn remove_agent_flow_edge(&self, flow_id: &str, edge_id: &str) -> Result<(), AgentError> {
421 let mut flows = self.flows.lock().unwrap();
422 let Some(flow) = flows.get_mut(flow_id) else {
423 return Err(AgentError::FlowNotFound(flow_id.to_string()));
424 };
425 let Some(edge) = flow.remove_edge(edge_id) else {
426 return Err(AgentError::EdgeNotFound(edge_id.to_string()));
427 };
428 self.remove_edge(&edge);
429 Ok(())
430 }
431
432 pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
433 let mut edges = self.edges.lock().unwrap();
434 if let Some(targets) = edges.get_mut(&edge.source) {
435 targets.retain(|(target, source_handle, target_handle)| {
436 *target != edge.target
437 || *source_handle != edge.source_handle
438 || *target_handle != edge.target_handle
439 });
440 if targets.is_empty() {
441 edges.remove(&edge.source);
442 }
443 }
444 }
445
446 pub fn copy_sub_flow(
447 &self,
448 nodes: &Vec<AgentFlowNode>,
449 edges: &Vec<AgentFlowEdge>,
450 ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
451 flow::copy_sub_flow(nodes, edges)
452 }
453
454 pub async fn start_agent_flow(&self, id: &str) -> Result<(), AgentError> {
455 let flow = {
456 let flows = self.flows.lock().unwrap();
457 let Some(flow) = flows.get(id) else {
458 return Err(AgentError::FlowNotFound(id.to_string()));
459 };
460 flow.clone()
461 };
462 flow.start(self).await?;
463 Ok(())
464 }
465
466 pub async fn stop_agent_flow(&self, id: &str) -> Result<(), AgentError> {
467 let flow = {
468 let flows = self.flows.lock().unwrap();
469 let Some(flow) = flows.get(id) else {
470 return Err(AgentError::FlowNotFound(id.to_string()));
471 };
472 flow.clone()
473 };
474 flow.stop(self).await?;
475 Ok(())
476 }
477
478 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
479 let agent = {
480 let agents = self.agents.lock().unwrap();
481 let Some(a) = agents.get(agent_id) else {
482 return Err(AgentError::AgentNotFound(agent_id.to_string()));
483 };
484 a.clone()
485 };
486 let def_name = {
487 let agent = agent.lock().await;
488 agent.def_name().to_string()
489 };
490 let uses_native_thread = {
491 let defs = self.defs.lock().unwrap();
492 let Some(def) = defs.get(&def_name) else {
493 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
494 };
495 def.native_thread
496 };
497 let agent_status = {
498 let agent = agent.lock().await;
499 agent.status().clone()
500 };
501 if agent_status == AgentStatus::Init {
502 log::info!("Starting agent {}", agent_id);
503
504 if uses_native_thread {
505 let (tx, rx) = std::sync::mpsc::channel();
506
507 {
508 let mut agent_txs = self.agent_txs.lock().unwrap();
509 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
510 };
511
512 let agent_id = agent_id.to_string();
513 std::thread::spawn(async move || {
514 if let Err(e) = agent.lock().await.start().await {
515 log::error!("Failed to start agent {}: {}", agent_id, e);
516 }
517
518 while let Ok(message) = rx.recv() {
519 match message {
520 AgentMessage::Input { ctx, pin, value } => {
521 agent
522 .lock()
523 .await
524 .process(ctx, pin, value)
525 .await
526 .unwrap_or_else(|e| {
527 log::error!("Process Error {}: {}", agent_id, e);
528 });
529 }
530 AgentMessage::Config { configs } => {
531 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
532 log::error!("Config Error {}: {}", agent_id, e);
533 });
534 }
535 AgentMessage::Stop => {
536 break;
537 }
538 }
539 }
540 });
541 } else {
542 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
543
544 {
545 let mut agent_txs = self.agent_txs.lock().unwrap();
546 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
547 };
548
549 let agent_id = agent_id.to_string();
550 tokio::spawn(async move {
551 {
552 let mut agent_guard = agent.lock().await;
553 if let Err(e) = agent_guard.start().await {
554 log::error!("Failed to start agent {}: {}", agent_id, e);
555 }
556 }
557
558 while let Some(message) = rx.recv().await {
559 match message {
560 AgentMessage::Input { ctx, pin, value } => {
561 agent
562 .lock()
563 .await
564 .process(ctx, pin, value)
565 .await
566 .unwrap_or_else(|e| {
567 log::error!("Process Error {}: {}", agent_id, e);
568 });
569 }
570 AgentMessage::Config { configs } => {
571 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
572 log::error!("Config Error {}: {}", agent_id, e);
573 });
574 }
575 AgentMessage::Stop => {
576 rx.close();
577 return;
578 }
579 }
580 }
581 });
582 tokio::task::yield_now().await;
583 }
584 }
585 Ok(())
586 }
587
588 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
589 let agent = {
590 let agents = self.agents.lock().unwrap();
591 let Some(a) = agents.get(agent_id) else {
592 return Err(AgentError::AgentNotFound(agent_id.to_string()));
593 };
594 a.clone()
595 };
596
597 let agent_status = {
598 let agent = agent.lock().await;
599 agent.status().clone()
600 };
601 if agent_status == AgentStatus::Start {
602 log::info!("Stopping agent {}", agent_id);
603
604 {
605 let mut agent_txs = self.agent_txs.lock().unwrap();
606 if let Some(tx) = agent_txs.remove(agent_id) {
607 match tx {
608 AgentMessageSender::Sync(tx) => {
609 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
610 log::error!(
611 "Failed to send stop message to agent {}: {}",
612 agent_id,
613 e
614 );
615 });
616 }
617 AgentMessageSender::Async(tx) => {
618 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
619 log::error!(
620 "Failed to send stop message to agent {}: {}",
621 agent_id,
622 e
623 );
624 });
625 }
626 }
627 }
628 }
629
630 agent.lock().await.stop().await?;
631 }
632
633 Ok(())
634 }
635
636 pub async fn set_agent_configs(
637 &self,
638 agent_id: String,
639 configs: AgentConfigs,
640 ) -> Result<(), AgentError> {
641 let agent = {
642 let agents = self.agents.lock().unwrap();
643 let Some(a) = agents.get(&agent_id) else {
644 return Err(AgentError::AgentNotFound(agent_id.to_string()));
645 };
646 a.clone()
647 };
648
649 let agent_status = {
650 let agent = agent.lock().await;
651 agent.status().clone()
652 };
653 if agent_status == AgentStatus::Init {
654 agent.lock().await.set_configs(configs.clone())?;
655 } else if agent_status == AgentStatus::Start {
656 let tx = {
657 let agent_txs = self.agent_txs.lock().unwrap();
658 let Some(tx) = agent_txs.get(&agent_id) else {
659 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
660 };
661 tx.clone()
662 };
663 let message = AgentMessage::Config { configs };
664 match tx {
665 AgentMessageSender::Sync(tx) => {
666 tx.send(message).map_err(|_| {
667 AgentError::SendMessageFailed("Failed to send config message".to_string())
668 })?;
669 }
670 AgentMessageSender::Async(tx) => {
671 tx.send(message).await.map_err(|_| {
672 AgentError::SendMessageFailed("Failed to send config message".to_string())
673 })?;
674 }
675 }
676 }
677 Ok(())
678 }
679
680 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
681 let global_configs_map = self.global_configs_map.lock().unwrap();
682 global_configs_map.get(def_name).cloned()
683 }
684
685 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
686 let mut global_configs_map = self.global_configs_map.lock().unwrap();
687
688 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
689 global_configs_map.insert(def_name, configs);
690 return;
691 };
692
693 for (key, value) in configs {
694 existing_configs.set(key, value);
695 }
696 }
697
698 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
699 let global_configs_map = self.global_configs_map.lock().unwrap();
700 global_configs_map.clone()
701 }
702
703 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
704 for (agent_name, new_configs) in new_configs_map {
705 self.set_global_configs(agent_name, new_configs);
706 }
707 }
708
709 pub async fn agent_input(
710 &self,
711 agent_id: String,
712 ctx: AgentContext,
713 pin: String,
714 value: AgentValue,
715 ) -> Result<(), AgentError> {
716 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
717 let agents = self.agents.lock().unwrap();
718 let Some(a) = agents.get(&agent_id) else {
719 return Err(AgentError::AgentNotFound(agent_id.to_string()));
720 };
721 a.clone()
722 };
723
724 let agent_status = {
725 let agent = agent.lock().await;
726 agent.status().clone()
727 };
728 if agent_status != AgentStatus::Start {
729 return Ok(());
730 }
731
732 if pin.starts_with("config:") {
733 let config_key = pin[7..].to_string();
734 let mut agent = agent.lock().await;
735 agent.set_config(config_key.clone(), value.clone())?;
736 return Ok(());
737 }
738
739 let message = AgentMessage::Input {
740 ctx,
741 pin: pin.clone(),
742 value,
743 };
744
745 let tx = {
746 let agent_txs = self.agent_txs.lock().unwrap();
747 let Some(tx) = agent_txs.get(&agent_id) else {
748 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
749 };
750 tx.clone()
751 };
752 match tx {
753 AgentMessageSender::Sync(tx) => {
754 tx.send(message).map_err(|_| {
755 AgentError::SendMessageFailed("Failed to send input message".to_string())
756 })?;
757 }
758 AgentMessageSender::Async(tx) => {
759 tx.send(message).await.map_err(|_| {
760 AgentError::SendMessageFailed("Failed to send input message".to_string())
761 })?;
762 }
763 }
764
765 self.emit_agent_input(agent_id.to_string(), pin);
766
767 Ok(())
768 }
769
770 pub async fn send_agent_out(
771 &self,
772 agent_id: String,
773 ctx: AgentContext,
774 pin: String,
775 value: AgentValue,
776 ) -> Result<(), AgentError> {
777 message::send_agent_out(self, agent_id, ctx, pin, value).await
778 }
779
780 pub fn try_send_agent_out(
781 &self,
782 agent_id: String,
783 ctx: AgentContext,
784 pin: String,
785 value: AgentValue,
786 ) -> Result<(), AgentError> {
787 message::try_send_agent_out(self, agent_id, ctx, pin, value)
788 }
789
790 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
791 self.try_send_board_out(name, AgentContext::new(), value)
792 }
793
794 pub(crate) fn try_send_board_out(
795 &self,
796 name: String,
797 ctx: AgentContext,
798 value: AgentValue,
799 ) -> Result<(), AgentError> {
800 message::try_send_board_out(self, name, ctx, value)
801 }
802
803 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
804 let (tx, mut rx) = mpsc::channel(4096);
806 {
807 let mut tx_lock = self.tx.lock().unwrap();
808 *tx_lock = Some(tx);
809 }
810
811 let askit = self.clone();
813 tokio::spawn(async move {
814 while let Some(message) = rx.recv().await {
815 use AgentEventMessage::*;
816
817 match message {
818 AgentOut {
819 agent,
820 ctx,
821 pin,
822 value,
823 } => {
824 message::agent_out(&askit, agent, ctx, pin, value).await;
825 }
826 BoardOut { name, ctx, value } => {
827 message::board_out(&askit, name, ctx, value).await;
828 }
829 }
830 }
831 });
832
833 tokio::task::yield_now().await;
834
835 Ok(())
836 }
837
838 async fn start_agent_flows(&self) -> Result<(), AgentError> {
839 let agent_flow_ids;
840 {
841 let agent_flows = self.flows.lock().unwrap();
842 agent_flow_ids = agent_flows.keys().cloned().collect::<Vec<_>>();
843 }
844 for id in agent_flow_ids {
845 self.start_agent_flow(&id).await.unwrap_or_else(|e| {
846 log::error!("Failed to start agent flow: {}", e);
847 });
848 }
849 Ok(())
850 }
851
852 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
853 let mut observers = self.observers.lock().unwrap();
854 let observer_id = new_observer_id();
855 observers.insert(observer_id, observer);
856 observer_id
857 }
858
859 pub fn unsubscribe(&self, observer_id: usize) {
860 let mut observers = self.observers.lock().unwrap();
861 observers.remove(&observer_id);
862 }
863
864 pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
865 self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
866 }
867
868 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
869 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
870 }
871
872 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
873 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
874 }
875
876 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
877 if name.starts_with('%') {
879 return;
880 }
881 self.notify_observers(ASKitEvent::Board(name, value));
882 }
883
884 fn notify_observers(&self, event: ASKitEvent) {
885 let observers = self.observers.lock().unwrap();
886 for (_id, observer) in observers.iter() {
887 observer.notify(&event);
888 }
889 }
890}
891
892#[derive(Clone, Debug)]
893pub enum ASKitEvent {
894 AgentDisplay(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), Board(String, AgentValue), }
899
900pub trait ASKitObserver {
901 fn notify(&self, event: &ASKitEvent);
902}
903
904static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
905
906fn new_observer_id() -> usize {
907 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
908}
909
910#[derive(Clone)]
913pub enum AgentMessageSender {
914 Sync(std::sync::mpsc::Sender<AgentMessage>),
915 Async(mpsc::Sender<AgentMessage>),
916}