1use std::sync::atomic::AtomicUsize;
2use std::sync::{Arc, Mutex};
3
4use tokio::sync::{Mutex as AsyncMutex, mpsc};
5
6use crate::FnvIndexMap;
7use crate::agent::{Agent, AgentMessage, AgentSpec, AgentStatus, agent_new};
8use crate::config::{AgentConfigs, AgentConfigsMap};
9use crate::context::AgentContext;
10use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
11use crate::error::AgentError;
12use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
13use crate::message::{self, AgentEventMessage};
14use crate::registry;
15use crate::value::AgentValue;
16
17const MESSAGE_LIMIT: usize = 1024;
18
19#[derive(Clone)]
20pub struct ASKit {
21 pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
23
24 pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, AgentMessageSender>>>,
26
27 pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
29
30 pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
32
33 pub(crate) edges: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
35
36 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
38
39 pub(crate) flows: Arc<Mutex<AgentFlows>>,
41
42 pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
44
45 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
47
48 pub(crate) observers: Arc<Mutex<FnvIndexMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
50}
51
52impl ASKit {
53 pub fn new() -> Self {
54 Self {
55 agents: Default::default(),
56 agent_txs: Default::default(),
57 board_out_agents: Default::default(),
58 board_value: Default::default(),
59 edges: Default::default(),
60 defs: Default::default(),
61 flows: Default::default(),
62 global_configs_map: Default::default(),
63 tx: Arc::new(Mutex::new(None)),
64 observers: Default::default(),
65 }
66 }
67
68 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
69 self.tx
70 .lock()
71 .unwrap()
72 .clone()
73 .ok_or(AgentError::TxNotInitialized)
74 }
75
76 pub fn init() -> Result<Self, AgentError> {
77 let askit = Self::new();
78 askit.register_agents();
79 Ok(askit)
80 }
81
82 fn register_agents(&self) {
83 registry::register_inventory_agents(self);
84 }
85
86 pub async fn ready(&self) -> Result<(), AgentError> {
87 self.spawn_message_loop().await?;
88 self.start_agent_flows().await?;
89 Ok(())
90 }
91
92 pub fn quit(&self) {
93 let mut tx_lock = self.tx.lock().unwrap();
94 *tx_lock = None;
95 }
96
97 pub fn register_agent(&self, def: AgentDefinition) {
98 let def_name = def.name.clone();
99 let def_global_configs = def.global_configs.clone();
100
101 let mut defs = self.defs.lock().unwrap();
102 defs.insert(def.name.clone(), def);
103
104 if let Some(def_global_configs) = def_global_configs {
106 let mut new_configs = AgentConfigs::default();
107 for (key, config_entry) in def_global_configs.iter() {
108 new_configs.set(key.clone(), config_entry.value.clone());
109 }
110 self.set_global_configs(def_name, new_configs);
111 }
112 }
113
114 pub fn get_agent_definitions(&self) -> AgentDefinitions {
115 let defs = self.defs.lock().unwrap();
116 defs.clone()
117 }
118
119 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
120 let defs = self.defs.lock().unwrap();
121 defs.get(def_name).cloned()
122 }
123
124 pub fn get_agent_default_configs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
125 let defs = self.defs.lock().unwrap();
126 let Some(def) = defs.get(def_name) else {
127 return None;
128 };
129 def.default_configs.clone()
130 }
131
132 pub fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
133 let agents = self.agents.lock().unwrap();
134 let Some(agent) = agents.get(agent_id) else {
135 return None;
136 };
137 let agent = agent.blocking_lock();
138 Some(agent.spec().clone())
139 }
140
141 pub fn get_agent_flows(&self) -> AgentFlows {
144 let flows = self.flows.lock().unwrap();
145 flows.clone()
146 }
147
148 pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
149 if !Self::is_valid_flow_name(name) {
150 return Err(AgentError::InvalidFlowName(name.into()));
151 }
152
153 let new_name = self.unique_flow_name(name);
154 let mut flows = self.flows.lock().unwrap();
155 let flow = AgentFlow::new(new_name.clone());
156 flows.insert(flow.id().to_string(), flow.clone());
157 Ok(flow)
158 }
159
160 pub fn rename_agent_flow(&self, id: &str, new_name: &str) -> Result<String, AgentError> {
161 if !Self::is_valid_flow_name(new_name) {
162 return Err(AgentError::InvalidFlowName(new_name.into()));
163 }
164
165 let new_name = self.unique_flow_name(new_name);
167
168 let mut flows = self.flows.lock().unwrap();
169
170 let Some(mut flow) = flows.swap_remove(id) else {
172 return Err(AgentError::RenameFlowFailed(id.into()));
173 };
174
175 flow.set_name(new_name.clone());
177 flows.insert(flow.id().to_string(), flow);
178 Ok(new_name)
179 }
180
181 fn is_valid_flow_name(new_name: &str) -> bool {
182 if new_name.trim().is_empty() {
184 return false;
185 }
186
187 if new_name.contains('/') {
189 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
191 return false;
192 }
193 if new_name
195 .split('/')
196 .any(|segment| segment == "." || segment == "..")
197 {
198 return false;
199 }
200 }
201
202 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
204 for c in invalid_chars {
205 if new_name.contains(c) {
206 return false;
207 }
208 }
209
210 true
211 }
212
213 pub fn unique_flow_name(&self, name: &str) -> String {
214 let mut new_name = name.trim().to_string();
215 let mut i = 2;
216 let flows = self.flows.lock().unwrap();
217 while flows.values().any(|flow| flow.name() == new_name) {
218 new_name = format!("{}{}", name, i);
219 i += 1;
220 }
221 new_name
222 }
223
224 pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
225 let id = agent_flow.id();
226
227 {
229 let mut flows = self.flows.lock().unwrap();
230 if flows.contains_key(id) {
231 return Err(AgentError::DuplicateId(id.into()));
232 }
233 flows.insert(id.to_string(), agent_flow.clone());
234 }
235
236 for node in agent_flow.nodes().iter() {
238 if let Err(e) = self.add_agent(id, node) {
239 log::error!("Failed to add_agent_node {}: {}", node.id, e);
240 }
241 }
242
243 for edge in agent_flow.edges().iter() {
245 self.add_edge(edge).unwrap_or_else(|e| {
246 log::error!("Failed to add_edge {}: {}", edge.source, e);
247 });
248 }
249
250 Ok(())
251 }
252
253 pub async fn remove_agent_flow(&self, id: &str) -> Result<(), AgentError> {
254 let flow = {
255 let mut flows = self.flows.lock().unwrap();
256 let Some(flow) = flows.swap_remove(id) else {
257 return Err(AgentError::FlowNotFound(id.to_string()));
258 };
259 flow.clone()
260 };
261
262 flow.stop(self).await?;
263
264 for node in flow.nodes() {
266 self.remove_agent(&node.id).await?;
267 }
268 for edge in flow.edges() {
269 self.remove_edge(edge);
270 }
271
272 Ok(())
273 }
274
275 pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
276 let flow_id = flow.id();
277
278 let mut flows = self.flows.lock().unwrap();
279 flows.insert(flow_id.to_string(), flow);
280 Ok(())
281 }
282
283 pub fn new_agent_flow_node(&self, def_name: &str) -> Result<AgentFlowNode, AgentError> {
284 let def = self
285 .get_agent_definition(def_name)
286 .ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
287 AgentFlowNode::new(&def)
288 }
289
290 pub fn add_agent_flow_node(
291 &self,
292 flow_id: &str,
293 node: &AgentFlowNode,
294 ) -> Result<(), AgentError> {
295 let mut flows = self.flows.lock().unwrap();
296 let Some(flow) = flows.get_mut(flow_id) else {
297 return Err(AgentError::FlowNotFound(flow_id.to_string()));
298 };
299 flow.add_node(node.clone());
300 self.add_agent(flow_id, node)
301 }
302
303 pub(crate) fn add_agent(&self, flow_id: &str, node: &AgentFlowNode) -> Result<(), AgentError> {
304 let mut agents = self.agents.lock().unwrap();
305 if agents.contains_key(&node.id) {
306 return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
307 }
308 let mut agent = agent_new(self.clone(), node.id.clone(), node.spec.clone())?;
309 agent.set_flow_id(flow_id.to_string());
310 agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
311 Ok(())
312 }
313
314 pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
315 let agents = self.agents.lock().unwrap();
316 agents.get(agent_id).cloned()
317 }
318
319 pub fn add_agent_flow_edge(
320 &self,
321 flow_id: &str,
322 edge: &AgentFlowEdge,
323 ) -> Result<(), AgentError> {
324 let mut flows = self.flows.lock().unwrap();
325 let Some(flow) = flows.get_mut(flow_id) else {
326 return Err(AgentError::FlowNotFound(flow_id.to_string()));
327 };
328 flow.add_edge(edge.clone());
329 self.add_edge(edge)?;
330 Ok(())
331 }
332
333 pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
334 {
336 let agents = self.agents.lock().unwrap();
337 if !agents.contains_key(&edge.source) {
338 return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
339 }
340 }
341
342 if edge.source_handle.is_empty() {
344 return Err(AgentError::EmptySourceHandle);
345 }
346 if edge.target_handle.is_empty() {
347 return Err(AgentError::EmptyTargetHandle);
348 }
349
350 let mut edges = self.edges.lock().unwrap();
351 if let Some(targets) = edges.get_mut(&edge.source) {
352 if targets
353 .iter()
354 .any(|(target, source_handle, target_handle)| {
355 *target == edge.target
356 && *source_handle == edge.source_handle
357 && *target_handle == edge.target_handle
358 })
359 {
360 return Err(AgentError::EdgeAlreadyExists);
361 }
362 targets.push((
363 edge.target.clone(),
364 edge.source_handle.clone(),
365 edge.target_handle.clone(),
366 ));
367 } else {
368 edges.insert(
369 edge.source.clone(),
370 vec![(
371 edge.target.clone(),
372 edge.source_handle.clone(),
373 edge.target_handle.clone(),
374 )],
375 );
376 }
377 Ok(())
378 }
379
380 pub async fn remove_agent_flow_node(
381 &self,
382 flow_id: &str,
383 node_id: &str,
384 ) -> Result<(), AgentError> {
385 {
386 let mut flows = self.flows.lock().unwrap();
387 let Some(flow) = flows.get_mut(flow_id) else {
388 return Err(AgentError::FlowNotFound(flow_id.to_string()));
389 };
390 flow.remove_node(node_id);
391 }
392 self.remove_agent(node_id).await?;
393 Ok(())
394 }
395
396 pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
397 self.stop_agent(agent_id).await?;
398
399 {
401 let mut edges = self.edges.lock().unwrap();
402 let mut sources_to_remove = Vec::new();
403 for (source, targets) in edges.iter_mut() {
404 targets.retain(|(target, _, _)| target != agent_id);
405 if targets.is_empty() {
406 sources_to_remove.push(source.clone());
407 }
408 }
409 for source in sources_to_remove {
410 edges.swap_remove(&source);
411 }
412 edges.swap_remove(agent_id);
413 }
414
415 {
417 let mut agents = self.agents.lock().unwrap();
418 agents.swap_remove(agent_id);
419 }
420
421 Ok(())
422 }
423
424 pub fn remove_agent_flow_edge(&self, flow_id: &str, edge_id: &str) -> Result<(), AgentError> {
425 let mut flows = self.flows.lock().unwrap();
426 let Some(flow) = flows.get_mut(flow_id) else {
427 return Err(AgentError::FlowNotFound(flow_id.to_string()));
428 };
429 let Some(edge) = flow.remove_edge(edge_id) else {
430 return Err(AgentError::EdgeNotFound(edge_id.to_string()));
431 };
432 self.remove_edge(&edge);
433 Ok(())
434 }
435
436 pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
437 let mut edges = self.edges.lock().unwrap();
438 if let Some(targets) = edges.get_mut(&edge.source) {
439 targets.retain(|(target, source_handle, target_handle)| {
440 *target != edge.target
441 || *source_handle != edge.source_handle
442 || *target_handle != edge.target_handle
443 });
444 if targets.is_empty() {
445 edges.swap_remove(&edge.source);
446 }
447 }
448 }
449
450 pub fn copy_sub_flow(
451 &self,
452 nodes: &Vec<AgentFlowNode>,
453 edges: &Vec<AgentFlowEdge>,
454 ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
455 flow::copy_sub_flow(nodes, edges)
456 }
457
458 pub async fn start_agent_flow(&self, id: &str) -> Result<(), AgentError> {
459 let flow = {
460 let flows = self.flows.lock().unwrap();
461 let Some(flow) = flows.get(id) else {
462 return Err(AgentError::FlowNotFound(id.to_string()));
463 };
464 flow.clone()
465 };
466 flow.start(self).await?;
467 Ok(())
468 }
469
470 pub async fn stop_agent_flow(&self, id: &str) -> Result<(), AgentError> {
471 let flow = {
472 let flows = self.flows.lock().unwrap();
473 let Some(flow) = flows.get(id) else {
474 return Err(AgentError::FlowNotFound(id.to_string()));
475 };
476 flow.clone()
477 };
478 flow.stop(self).await?;
479 Ok(())
480 }
481
482 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
483 let agent = {
484 let agents = self.agents.lock().unwrap();
485 let Some(a) = agents.get(agent_id) else {
486 return Err(AgentError::AgentNotFound(agent_id.to_string()));
487 };
488 a.clone()
489 };
490 let def_name = {
491 let agent = agent.lock().await;
492 agent.def_name().to_string()
493 };
494 let uses_native_thread = {
495 let defs = self.defs.lock().unwrap();
496 let Some(def) = defs.get(&def_name) else {
497 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
498 };
499 def.native_thread
500 };
501 let agent_status = {
502 let agent = agent.lock().await;
503 agent.status().clone()
504 };
505 if agent_status == AgentStatus::Init {
506 log::info!("Starting agent {}", agent_id);
507
508 if uses_native_thread {
509 let (tx, rx) = std::sync::mpsc::channel();
510
511 {
512 let mut agent_txs = self.agent_txs.lock().unwrap();
513 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
514 };
515
516 let agent_id = agent_id.to_string();
517 std::thread::spawn(async move || {
518 if let Err(e) = agent.lock().await.start().await {
519 log::error!("Failed to start agent {}: {}", agent_id, e);
520 }
521
522 while let Ok(message) = rx.recv() {
523 match message {
524 AgentMessage::Input { ctx, pin, value } => {
525 agent
526 .lock()
527 .await
528 .process(ctx, pin, value)
529 .await
530 .unwrap_or_else(|e| {
531 log::error!("Process Error {}: {}", agent_id, e);
532 });
533 }
534 AgentMessage::Config { configs } => {
535 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
536 log::error!("Config Error {}: {}", agent_id, e);
537 });
538 }
539 AgentMessage::Stop => {
540 break;
541 }
542 }
543 }
544 });
545 } else {
546 let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
547
548 {
549 let mut agent_txs = self.agent_txs.lock().unwrap();
550 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
551 };
552
553 let agent_id = agent_id.to_string();
554 tokio::spawn(async move {
555 {
556 let mut agent_guard = agent.lock().await;
557 if let Err(e) = agent_guard.start().await {
558 log::error!("Failed to start agent {}: {}", agent_id, e);
559 }
560 }
561
562 while let Some(message) = rx.recv().await {
563 match message {
564 AgentMessage::Input { ctx, pin, value } => {
565 agent
566 .lock()
567 .await
568 .process(ctx, pin, value)
569 .await
570 .unwrap_or_else(|e| {
571 log::error!("Process Error {}: {}", agent_id, e);
572 });
573 }
574 AgentMessage::Config { configs } => {
575 agent.lock().await.set_configs(configs).unwrap_or_else(|e| {
576 log::error!("Config Error {}: {}", agent_id, e);
577 });
578 }
579 AgentMessage::Stop => {
580 rx.close();
581 return;
582 }
583 }
584 }
585 });
586 tokio::task::yield_now().await;
587 }
588 }
589 Ok(())
590 }
591
592 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
593 let agent = {
594 let agents = self.agents.lock().unwrap();
595 let Some(a) = agents.get(agent_id) else {
596 return Err(AgentError::AgentNotFound(agent_id.to_string()));
597 };
598 a.clone()
599 };
600
601 let agent_status = {
602 let agent = agent.lock().await;
603 agent.status().clone()
604 };
605 if agent_status == AgentStatus::Start {
606 log::info!("Stopping agent {}", agent_id);
607
608 {
609 let mut agent_txs = self.agent_txs.lock().unwrap();
610 if let Some(tx) = agent_txs.swap_remove(agent_id) {
611 match tx {
612 AgentMessageSender::Sync(tx) => {
613 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
614 log::error!(
615 "Failed to send stop message to agent {}: {}",
616 agent_id,
617 e
618 );
619 });
620 }
621 AgentMessageSender::Async(tx) => {
622 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
623 log::error!(
624 "Failed to send stop message to agent {}: {}",
625 agent_id,
626 e
627 );
628 });
629 }
630 }
631 }
632 }
633
634 agent.lock().await.stop().await?;
635 }
636
637 Ok(())
638 }
639
640 pub async fn set_agent_configs(
641 &self,
642 agent_id: String,
643 configs: AgentConfigs,
644 ) -> Result<(), AgentError> {
645 let agent = {
646 let agents = self.agents.lock().unwrap();
647 let Some(a) = agents.get(&agent_id) else {
648 return Err(AgentError::AgentNotFound(agent_id.to_string()));
649 };
650 a.clone()
651 };
652
653 let agent_status = {
654 let agent = agent.lock().await;
655 agent.status().clone()
656 };
657 if agent_status == AgentStatus::Init {
658 agent.lock().await.set_configs(configs.clone())?;
659 } else if agent_status == AgentStatus::Start {
660 let tx = {
661 let agent_txs = self.agent_txs.lock().unwrap();
662 let Some(tx) = agent_txs.get(&agent_id) else {
663 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
664 };
665 tx.clone()
666 };
667 let message = AgentMessage::Config { configs };
668 match tx {
669 AgentMessageSender::Sync(tx) => {
670 tx.send(message).map_err(|_| {
671 AgentError::SendMessageFailed("Failed to send config message".to_string())
672 })?;
673 }
674 AgentMessageSender::Async(tx) => {
675 tx.send(message).await.map_err(|_| {
676 AgentError::SendMessageFailed("Failed to send config message".to_string())
677 })?;
678 }
679 }
680 }
681 Ok(())
682 }
683
684 pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
685 let global_configs_map = self.global_configs_map.lock().unwrap();
686 global_configs_map.get(def_name).cloned()
687 }
688
689 pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
690 let mut global_configs_map = self.global_configs_map.lock().unwrap();
691
692 let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
693 global_configs_map.insert(def_name, configs);
694 return;
695 };
696
697 for (key, value) in configs {
698 existing_configs.set(key, value);
699 }
700 }
701
702 pub fn get_global_configs_map(&self) -> AgentConfigsMap {
703 let global_configs_map = self.global_configs_map.lock().unwrap();
704 global_configs_map.clone()
705 }
706
707 pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
708 for (agent_name, new_configs) in new_configs_map {
709 self.set_global_configs(agent_name, new_configs);
710 }
711 }
712
713 pub async fn agent_input(
714 &self,
715 agent_id: String,
716 ctx: AgentContext,
717 pin: String,
718 value: AgentValue,
719 ) -> Result<(), AgentError> {
720 let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
721 let agents = self.agents.lock().unwrap();
722 let Some(a) = agents.get(&agent_id) else {
723 return Err(AgentError::AgentNotFound(agent_id.to_string()));
724 };
725 a.clone()
726 };
727
728 let agent_status = {
729 let agent = agent.lock().await;
730 agent.status().clone()
731 };
732 if agent_status != AgentStatus::Start {
733 return Ok(());
734 }
735
736 if pin.starts_with("config:") {
737 let config_key = pin[7..].to_string();
738 let mut agent = agent.lock().await;
739 agent.set_config(config_key.clone(), value.clone())?;
740 return Ok(());
741 }
742
743 let message = AgentMessage::Input {
744 ctx,
745 pin: pin.clone(),
746 value,
747 };
748
749 let tx = {
750 let agent_txs = self.agent_txs.lock().unwrap();
751 let Some(tx) = agent_txs.get(&agent_id) else {
752 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
753 };
754 tx.clone()
755 };
756 match tx {
757 AgentMessageSender::Sync(tx) => {
758 tx.send(message).map_err(|_| {
759 AgentError::SendMessageFailed("Failed to send input message".to_string())
760 })?;
761 }
762 AgentMessageSender::Async(tx) => {
763 tx.send(message).await.map_err(|_| {
764 AgentError::SendMessageFailed("Failed to send input message".to_string())
765 })?;
766 }
767 }
768
769 self.emit_agent_input(agent_id.to_string(), pin);
770
771 Ok(())
772 }
773
774 pub async fn send_agent_out(
775 &self,
776 agent_id: String,
777 ctx: AgentContext,
778 pin: String,
779 value: AgentValue,
780 ) -> Result<(), AgentError> {
781 message::send_agent_out(self, agent_id, ctx, pin, value).await
782 }
783
784 pub fn try_send_agent_out(
785 &self,
786 agent_id: String,
787 ctx: AgentContext,
788 pin: String,
789 value: AgentValue,
790 ) -> Result<(), AgentError> {
791 message::try_send_agent_out(self, agent_id, ctx, pin, value)
792 }
793
794 pub fn write_board_value(&self, name: String, value: AgentValue) -> Result<(), AgentError> {
795 self.try_send_board_out(name, AgentContext::new(), value)
796 }
797
798 pub(crate) fn try_send_board_out(
799 &self,
800 name: String,
801 ctx: AgentContext,
802 value: AgentValue,
803 ) -> Result<(), AgentError> {
804 message::try_send_board_out(self, name, ctx, value)
805 }
806
807 async fn spawn_message_loop(&self) -> Result<(), AgentError> {
808 let (tx, mut rx) = mpsc::channel(4096);
810 {
811 let mut tx_lock = self.tx.lock().unwrap();
812 *tx_lock = Some(tx);
813 }
814
815 let askit = self.clone();
817 tokio::spawn(async move {
818 while let Some(message) = rx.recv().await {
819 use AgentEventMessage::*;
820
821 match message {
822 AgentOut {
823 agent,
824 ctx,
825 pin,
826 value,
827 } => {
828 message::agent_out(&askit, agent, ctx, pin, value).await;
829 }
830 BoardOut { name, ctx, value } => {
831 message::board_out(&askit, name, ctx, value).await;
832 }
833 }
834 }
835 });
836
837 tokio::task::yield_now().await;
838
839 Ok(())
840 }
841
842 async fn start_agent_flows(&self) -> Result<(), AgentError> {
843 let agent_flow_ids;
844 {
845 let agent_flows = self.flows.lock().unwrap();
846 agent_flow_ids = agent_flows.keys().cloned().collect::<Vec<_>>();
847 }
848 for id in agent_flow_ids {
849 self.start_agent_flow(&id).await.unwrap_or_else(|e| {
850 log::error!("Failed to start agent flow: {}", e);
851 });
852 }
853 Ok(())
854 }
855
856 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
857 let mut observers = self.observers.lock().unwrap();
858 let observer_id = new_observer_id();
859 observers.insert(observer_id, observer);
860 observer_id
861 }
862
863 pub fn unsubscribe(&self, observer_id: usize) {
864 let mut observers = self.observers.lock().unwrap();
865 observers.swap_remove(&observer_id);
866 }
867
868 pub(crate) fn emit_agent_display(&self, agent_id: String, key: String, value: AgentValue) {
869 self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, value));
870 }
871
872 pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
873 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
874 }
875
876 pub(crate) fn emit_agent_input(&self, agent_id: String, pin: String) {
877 self.notify_observers(ASKitEvent::AgentIn(agent_id, pin));
878 }
879
880 pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
881 self.notify_observers(ASKitEvent::AgentSpecUpdated(agent_id));
882 }
883
884 pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
885 if name.starts_with('%') {
887 return;
888 }
889 self.notify_observers(ASKitEvent::Board(name, value));
890 }
891
892 fn notify_observers(&self, event: ASKitEvent) {
893 let observers = self.observers.lock().unwrap();
894 for (_id, observer) in observers.iter() {
895 observer.notify(&event);
896 }
897 }
898}
899
900#[derive(Clone, Debug)]
901pub enum ASKitEvent {
902 AgentDisplay(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }
908
909pub trait ASKitObserver {
910 fn notify(&self, event: &ASKitEvent);
911}
912
913static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
914
915fn new_observer_id() -> usize {
916 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
917}
918
919#[derive(Clone)]
922pub enum AgentMessageSender {
923 Sync(std::sync::mpsc::Sender<AgentMessage>),
924 Async(mpsc::Sender<AgentMessage>),
925}