1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::board_agent;
9use crate::config::{AgentConfig, AgentConfigs};
10use crate::context::AgentContext;
11use crate::data::AgentData;
12use crate::definition::{AgentDefaultConfig, AgentDefinition, AgentDefinitions};
13use crate::error::AgentError;
14use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
15use crate::message::{self, AgentEventMessage};
16
17#[derive(Clone)]
18pub struct ASKit {
19 pub(crate) agents:
21 Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23 pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26 pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29 pub(crate) board_data: Arc<Mutex<HashMap<String, AgentData>>>,
31
32 pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38 pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41 pub(crate) global_configs: Arc<Mutex<HashMap<String, AgentConfig>>>,
43
44 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47 pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52 pub fn new() -> Self {
53 Self {
54 agents: Default::default(),
55 agent_txs: Default::default(),
56 board_out_agents: Default::default(),
57 board_data: Default::default(),
58 edges: Default::default(),
59 defs: Default::default(),
60 flows: Default::default(),
61 global_configs: Default::default(),
62 tx: Arc::new(Mutex::new(None)),
63 observers: Default::default(),
64 }
65 }
66
67 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68 self.tx
69 .lock()
70 .unwrap()
71 .clone()
72 .ok_or(AgentError::TxNotInitialized)
73 }
74
75 pub fn init() -> Result<Self, AgentError> {
76 let askit = Self::new();
77 askit.register_agents();
78 Ok(askit)
79 }
80
81 fn register_agents(&self) {
82 board_agent::register_agents(self);
83 }
84
85 pub async fn ready(&self) -> Result<(), AgentError> {
86 self.spawn_message_loop()?;
87 self.start_agent_flows().await?;
88 Ok(())
89 }
90
91 pub fn quit(&self) {
92 let mut tx_lock = self.tx.lock().unwrap();
93 *tx_lock = None;
94 }
95
96 pub fn register_agent(&self, def: AgentDefinition) {
97 let def_name = def.name.clone();
98 let def_global_config = def.global_config.clone();
99
100 let mut defs = self.defs.lock().unwrap();
101 defs.insert(def.name.clone(), def);
102
103 if let Some(def_global_config) = def_global_config {
105 let mut new_config = AgentConfig::default();
106 for (key, config_entry) in def_global_config.iter() {
107 new_config.set(key.clone(), config_entry.value.clone());
108 }
109 self.set_global_config(def_name, new_config);
110 }
111 }
112
113 pub fn get_agent_definitions(&self) -> AgentDefinitions {
114 let defs = self.defs.lock().unwrap();
115 defs.clone()
116 }
117
118 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119 let defs = self.defs.lock().unwrap();
120 defs.get(def_name).cloned()
121 }
122
123 pub fn get_agent_default_config(&self, def_name: &str) -> Option<AgentDefaultConfig> {
124 let defs = self.defs.lock().unwrap();
125 let Some(def) = defs.get(def_name) else {
126 return None;
127 };
128 def.default_config.clone()
129 }
130
131 pub fn get_agent_flows(&self) -> AgentFlows {
134 let flows = self.flows.lock().unwrap();
135 flows.clone()
136 }
137
138 pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139 if !Self::is_valid_flow_name(name) {
140 return Err(AgentError::InvalidFlowName(name.into()));
141 }
142
143 let new_name = self.unique_flow_name(name);
144 let mut flows = self.flows.lock().unwrap();
145 let flow = AgentFlow::new(new_name.clone());
146 flows.insert(new_name, flow.clone());
147 Ok(flow)
148 }
149
150 pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
151 if !Self::is_valid_flow_name(new_name) {
152 return Err(AgentError::InvalidFlowName(new_name.into()));
153 }
154
155 let new_name = self.unique_flow_name(new_name);
157
158 let mut flows = self.flows.lock().unwrap();
159
160 let Some(mut flow) = flows.remove(old_name) else {
162 return Err(AgentError::RenameFlowFailed(old_name.into()));
163 };
164
165 flow.set_name(new_name.clone());
167 flows.insert(new_name.clone(), flow);
168 Ok(new_name)
169 }
170
171 fn is_valid_flow_name(new_name: &str) -> bool {
172 if new_name.trim().is_empty() {
174 return false;
175 }
176
177 if new_name.contains('/') {
179 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181 return false;
182 }
183 if new_name
185 .split('/')
186 .any(|segment| segment == "." || segment == "..")
187 {
188 return false;
189 }
190 }
191
192 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194 for c in invalid_chars {
195 if new_name.contains(c) {
196 return false;
197 }
198 }
199
200 true
201 }
202
203 pub fn unique_flow_name(&self, name: &str) -> String {
204 let mut new_name = name.trim().to_string();
205 let mut i = 2;
206 let flows = self.flows.lock().unwrap();
207 while flows.contains_key(&new_name) {
208 new_name = format!("{}{}", name, i);
209 i += 1;
210 }
211 new_name
212 }
213
214 pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215 let name = agent_flow.name();
216
217 {
219 let mut flows = self.flows.lock().unwrap();
220 if flows.contains_key(name) {
221 return Err(AgentError::DuplicateFlowName(name.into()));
222 }
223 flows.insert(name.into(), agent_flow.clone());
224 }
225
226 for node in agent_flow.nodes().iter() {
228 self.add_agent(name, node).unwrap_or_else(|e| {
229 log::error!("Failed to add_agent_node {}: {}", node.id, e);
230 });
231 }
232
233 for edge in agent_flow.edges().iter() {
235 self.add_edge(edge).unwrap_or_else(|e| {
236 log::error!("Failed to add_edge {}: {}", edge.source, e);
237 });
238 }
239
240 Ok(())
241 }
242
243 pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
244 let flow = {
245 let mut flows = self.flows.lock().unwrap();
246 let Some(flow) = flows.remove(flow_name) else {
247 return Err(AgentError::FlowNotFound(flow_name.to_string()));
248 };
249 flow.clone()
250 };
251
252 flow.stop(self).await?;
253
254 for node in flow.nodes() {
256 self.remove_agent(&node.id).await?;
257 }
258 for edge in flow.edges() {
259 self.remove_edge(edge);
260 }
261
262 Ok(())
263 }
264
265 pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266 let flow_name = flow.name();
267
268 let mut flows = self.flows.lock().unwrap();
269 flows.insert(flow_name.to_string(), flow);
270 Ok(())
271 }
272
273 pub fn new_agent_flow_node(
274 &self,
275 def_name: &str,
276 ) -> Result<AgentFlowNode, AgentError> {
277 let def = self.get_agent_definition(def_name).ok_or_else(|| {
278 AgentError::AgentDefinitionNotFound(def_name.to_string())
279 })?;
280 AgentFlowNode::new(&def)
281 }
282
283 pub fn add_agent_flow_node(
284 &self,
285 flow_name: &str,
286 node: &AgentFlowNode,
287 ) -> Result<(), AgentError> {
288 let mut flows = self.flows.lock().unwrap();
289 let Some(flow) = flows.get_mut(flow_name) else {
290 return Err(AgentError::FlowNotFound(flow_name.to_string()));
291 };
292 flow.add_node(node.clone());
293 self.add_agent(flow_name, node)?;
294 Ok(())
295 }
296
297 pub(crate) fn add_agent(
298 &self,
299 flow_name: &str,
300 node: &AgentFlowNode,
301 ) -> Result<(), AgentError> {
302 let mut agents = self.agents.lock().unwrap();
303 if agents.contains_key(&node.id) {
304 return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
305 }
306 if let Ok(mut agent) = agent_new(
307 self.clone(),
308 node.id.clone(),
309 &node.def_name,
310 node.config.clone(),
311 ) {
312 agent.set_flow_name(flow_name.to_string());
313 agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
314 } else {
315 return Err(AgentError::AgentCreationFailed(node.id.to_string()));
316 }
317 Ok(())
318 }
319
320 pub fn add_agent_flow_edge(
321 &self,
322 flow_name: &str,
323 edge: &AgentFlowEdge,
324 ) -> Result<(), AgentError> {
325 let mut flows = self.flows.lock().unwrap();
326 let Some(flow) = flows.get_mut(flow_name) else {
327 return Err(AgentError::FlowNotFound(flow_name.to_string()));
328 };
329 flow.add_edge(edge.clone());
330 self.add_edge(edge)?;
331 Ok(())
332 }
333
334 pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
335 {
337 let agents = self.agents.lock().unwrap();
338 if !agents.contains_key(&edge.source) {
339 return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
340 }
341 }
342
343 if edge.source_handle.is_empty() {
345 return Err(AgentError::EmptySourceHandle);
346 }
347 if edge.target_handle.is_empty() {
348 return Err(AgentError::EmptyTargetHandle);
349 }
350
351 let mut edges = self.edges.lock().unwrap();
352 if let Some(targets) = edges.get_mut(&edge.source) {
353 if targets
354 .iter()
355 .any(|(target, source_handle, target_handle)| {
356 *target == edge.target
357 && *source_handle == edge.source_handle
358 && *target_handle == edge.target_handle
359 })
360 {
361 return Err(AgentError::EdgeAlreadyExists);
362 }
363 targets.push((
364 edge.target.clone(),
365 edge.source_handle.clone(),
366 edge.target_handle.clone(),
367 ));
368 } else {
369 edges.insert(
370 edge.source.clone(),
371 vec![(
372 edge.target.clone(),
373 edge.source_handle.clone(),
374 edge.target_handle.clone(),
375 )],
376 );
377 }
378 Ok(())
379 }
380
381 pub async fn remove_agent_flow_node(
382 &self,
383 flow_name: &str,
384 node_id: &str,
385 ) -> Result<(), AgentError> {
386 {
387 let mut flows = self.flows.lock().unwrap();
388 let Some(flow) = flows.get_mut(flow_name) else {
389 return Err(AgentError::FlowNotFound(flow_name.to_string()));
390 };
391 flow.remove_node(node_id);
392 }
393 self.remove_agent(node_id).await?;
394 Ok(())
395 }
396
397 pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
398 self.stop_agent(agent_id).await?;
399
400 {
402 let mut edges = self.edges.lock().unwrap();
403 let mut sources_to_remove = Vec::new();
404 for (source, targets) in edges.iter_mut() {
405 targets.retain(|(target, _, _)| target != agent_id);
406 if targets.is_empty() {
407 sources_to_remove.push(source.clone());
408 }
409 }
410 for source in sources_to_remove {
411 edges.remove(&source);
412 }
413 edges.remove(agent_id);
414 }
415
416 {
418 let mut agents = self.agents.lock().unwrap();
419 agents.remove(agent_id);
420 }
421
422 Ok(())
423 }
424
425 pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
426 let mut flows = self.flows.lock().unwrap();
427 let Some(flow) = flows.get_mut(flow_name) else {
428 return Err(AgentError::FlowNotFound(flow_name.to_string()));
429 };
430 let Some(edge) = flow.remove_edge(edge_id) else {
431 return Err(AgentError::EdgeNotFound(edge_id.to_string()));
432 };
433 self.remove_edge(&edge);
434 Ok(())
435 }
436
437 pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
438 let mut edges = self.edges.lock().unwrap();
439 if let Some(targets) = edges.get_mut(&edge.source) {
440 targets.retain(|(target, source_handle, target_handle)| {
441 *target != edge.target
442 || *source_handle != edge.source_handle
443 || *target_handle != edge.target_handle
444 });
445 if targets.is_empty() {
446 edges.remove(&edge.source);
447 }
448 }
449 }
450
451 pub fn copy_sub_flow(
452 &self,
453 nodes: &Vec<AgentFlowNode>,
454 edges: &Vec<AgentFlowEdge>,
455 ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
456 flow::copy_sub_flow(nodes, edges)
457 }
458
459 pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
460 let flow = {
461 let flows = self.flows.lock().unwrap();
462 let Some(flow) = flows.get(name) else {
463 return Err(AgentError::FlowNotFound(name.to_string()));
464 };
465 flow.clone()
466 };
467 flow.start(self).await?;
468 Ok(())
469 }
470
471 pub async fn stop_agent_flow(&self, name: &str) -> Result<(), AgentError> {
472 let flow = {
473 let flows = self.flows.lock().unwrap();
474 let Some(flow) = flows.get(name) else {
475 return Err(AgentError::FlowNotFound(name.to_string()));
476 };
477 flow.clone()
478 };
479 flow.stop(self).await?;
480 Ok(())
481 }
482
483 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
484 let agent = {
485 let agents = self.agents.lock().unwrap();
486 let Some(a) = agents.get(agent_id) else {
487 return Err(AgentError::AgentNotFound(agent_id.to_string()));
488 };
489 a.clone()
490 };
491 let def_name = {
492 let agent = agent.lock().await;
493 agent.def_name().to_string()
494 };
495 let uses_native_thread = {
496 let defs = self.defs.lock().unwrap();
497 let Some(def) = defs.get(&def_name) else {
498 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
499 };
500 def.native_thread
501 };
502 let agent_status = {
503 let agent = agent.lock().await;
504 agent.status().clone()
505 };
506 if agent_status == AgentStatus::Init {
507 log::info!("Starting agent {}", agent_id);
508
509 if uses_native_thread {
510 let (tx, rx) = std::sync::mpsc::channel();
511
512 {
513 let mut agent_txs = self.agent_txs.lock().unwrap();
514 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
515 };
516
517 let agent_id = agent_id.to_string();
518 std::thread::spawn(async move || {
519 if let Err(e) = agent.lock().await.start() {
520 log::error!("Failed to start agent {}: {}", agent_id, e);
521 }
522
523 while let Ok(message) = rx.recv() {
524 match message {
525 AgentMessage::Input { ctx, data } => {
526 agent
527 .lock()
528 .await
529 .process(ctx, data)
530 .await
531 .unwrap_or_else(|e| {
532 log::error!("Process Error {}: {}", agent_id, e);
533 });
534 }
535 AgentMessage::Config { config } => {
536 agent.lock().await.set_config(config).unwrap_or_else(|e| {
537 log::error!("Config Error {}: {}", agent_id, e);
538 });
539 }
540 AgentMessage::Stop => {
541 break;
542 }
543 }
544 }
545 });
546 } else {
547 let (tx, mut rx) = mpsc::channel(32);
548
549 {
550 let mut agent_txs = self.agent_txs.lock().unwrap();
551 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
552 };
553
554 let agent_id = agent_id.to_string();
555 tokio::spawn(async move {
556 {
557 let mut agent_guard = agent.lock().await;
558 if let Err(e) = agent_guard.start() {
559 log::error!("Failed to start agent {}: {}", agent_id, e);
560 }
561 }
562
563 while let Some(message) = rx.recv().await {
564 match message {
565 AgentMessage::Input { ctx, data } => {
566 agent
567 .lock()
568 .await
569 .process(ctx, data)
570 .await
571 .unwrap_or_else(|e| {
572 log::error!("Process Error {}: {}", agent_id, e);
573 });
574 }
575 AgentMessage::Config { config } => {
576 agent.lock().await.set_config(config).unwrap_or_else(|e| {
577 log::error!("Config Error {}: {}", agent_id, e);
578 });
579 }
580 AgentMessage::Stop => {
581 rx.close();
582 return;
583 }
584 }
585 }
586 });
587 }
588 }
589 Ok(())
590 }
591
592 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
593 let agent = {
594 let agents = self.agents.lock().unwrap();
595 let Some(a) = agents.get(agent_id) else {
596 return Err(AgentError::AgentNotFound(agent_id.to_string()));
597 };
598 a.clone()
599 };
600
601 let agent_status = {
602 let agent = agent.lock().await;
603 agent.status().clone()
604 };
605 if agent_status == AgentStatus::Start {
606 log::info!("Stopping agent {}", agent_id);
607
608 {
609 let mut agent_txs = self.agent_txs.lock().unwrap();
610 if let Some(tx) = agent_txs.remove(agent_id) {
611 match tx {
612 AgentMessageSender::Sync(tx) => {
613 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
614 log::error!(
615 "Failed to send stop message to agent {}: {}",
616 agent_id,
617 e
618 );
619 });
620 }
621 AgentMessageSender::Async(tx) => {
622 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
623 log::error!(
624 "Failed to send stop message to agent {}: {}",
625 agent_id,
626 e
627 );
628 });
629 }
630 }
631 }
632 }
633
634 agent.lock().await.stop()?;
635 }
636
637 Ok(())
638 }
639
640 pub async fn set_agent_config(
641 &self,
642 agent_id: String,
643 config: AgentConfig,
644 ) -> Result<(), AgentError> {
645 let agent = {
646 let agents = self.agents.lock().unwrap();
647 let Some(a) = agents.get(&agent_id) else {
648 return Err(AgentError::AgentNotFound(agent_id.to_string()));
649 };
650 a.clone()
651 };
652
653 let agent_status = {
654 let agent = agent.lock().await;
655 agent.status().clone()
656 };
657 if agent_status == AgentStatus::Init {
658 agent.lock().await.set_config(config.clone())?;
659 } else if agent_status == AgentStatus::Start {
660 let tx = {
661 let agent_txs = self.agent_txs.lock().unwrap();
662 let Some(tx) = agent_txs.get(&agent_id) else {
663 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
664 };
665 tx.clone()
666 };
667 let message = AgentMessage::Config { config };
668 match tx {
669 AgentMessageSender::Sync(tx) => {
670 tx.send(message).map_err(|_| {
671 AgentError::SendMessageFailed("Failed to send config message".to_string())
672 })?;
673 }
674 AgentMessageSender::Async(tx) => {
675 tx.send(message).await.map_err(|_| {
676 AgentError::SendMessageFailed("Failed to send config message".to_string())
677 })?;
678 }
679 }
680 }
681 Ok(())
682 }
683
684 pub fn get_global_config(&self, def_name: &str) -> Option<AgentConfig> {
685 let global_configs = self.global_configs.lock().unwrap();
686 global_configs.get(def_name).cloned()
687 }
688
689 pub fn set_global_config(&self, def_name: String, config: AgentConfig) {
690 let mut global_configs = self.global_configs.lock().unwrap();
691
692 let Some(existing_config) = global_configs.get_mut(&def_name) else {
693 global_configs.insert(def_name, config);
694 return;
695 };
696
697 for (key, value) in config {
698 existing_config.set(key, value);
699 }
700 }
701
702 pub fn set_global_configs(&self, new_configs: AgentConfigs) {
703 for (agent_name, new_config) in new_configs {
704 self.set_global_config(agent_name, new_config);
705 }
706 }
707
708 pub fn get_global_configs(&self) -> AgentConfigs {
709 let global_configs = self.global_configs.lock().unwrap();
710 global_configs.clone()
711 }
712
713 pub(crate) async fn agent_input(
714 &self,
715 agent_id: String,
716 ctx: AgentContext,
717 data: AgentData,
718 ) -> Result<(), AgentError> {
719 let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
720 let agents = self.agents.lock().unwrap();
721 let Some(a) = agents.get(&agent_id) else {
722 return Err(AgentError::AgentNotFound(agent_id.to_string()));
723 };
724 a.clone()
725 };
726
727 let agent_status = {
728 let agent = agent.lock().await;
729 agent.status().clone()
730 };
731 if agent_status == AgentStatus::Start {
732 let ch = ctx.port().to_string();
733 let message = AgentMessage::Input { ctx, data };
734
735 let tx = {
736 let agent_txs = self.agent_txs.lock().unwrap();
737 let Some(tx) = agent_txs.get(&agent_id) else {
738 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
739 };
740 tx.clone()
741 };
742 match tx {
743 AgentMessageSender::Sync(tx) => {
744 tx.send(message).map_err(|_| {
745 AgentError::SendMessageFailed("Failed to send input message".to_string())
746 })?;
747 }
748 AgentMessageSender::Async(tx) => {
749 tx.send(message).await.map_err(|_| {
750 AgentError::SendMessageFailed("Failed to send input message".to_string())
751 })?;
752 }
753 }
754
755 self.emit_input(agent_id.to_string(), ch);
756 }
757 Ok(())
758 }
759
760 pub async fn send_agent_out(
761 &self,
762 agent_id: String,
763 ctx: AgentContext,
764 data: AgentData,
765 ) -> Result<(), AgentError> {
766 message::send_agent_out(self, agent_id, ctx, data).await
767 }
768
769 pub fn try_send_agent_out(
770 &self,
771 agent_id: String,
772 ctx: AgentContext,
773 data: AgentData,
774 ) -> Result<(), AgentError> {
775 message::try_send_agent_out(self, agent_id, ctx, data)
776 }
777
778 pub fn write_board_data(&self, name: String, data: AgentData) -> Result<(), AgentError> {
779 self.try_send_board_out(name, AgentContext::new(), data)
780 }
781
782 pub(crate) fn try_send_board_out(
783 &self,
784 name: String,
785 ctx: AgentContext,
786 data: AgentData,
787 ) -> Result<(), AgentError> {
788 message::try_send_board_out(self, name, ctx, data)
789 }
790
791 fn spawn_message_loop(&self) -> Result<(), AgentError> {
792 let (tx, mut rx) = mpsc::channel(4096);
794 {
795 let mut tx_lock = self.tx.lock().unwrap();
796 *tx_lock = Some(tx);
797 }
798
799 let askit = self.clone();
801 tokio::spawn(async move {
802 while let Some(message) = rx.recv().await {
803 use AgentEventMessage::*;
804
805 match message {
806 AgentOut { agent, ctx, data } => {
807 message::agent_out(&askit, agent, ctx, data).await;
808 }
809 BoardOut { name, ctx, data } => {
810 message::board_out(&askit, name, ctx, data).await;
811 }
812 }
813 }
814 });
815
816 Ok(())
817 }
818
819 async fn start_agent_flows(&self) -> Result<(), AgentError> {
820 let agent_flow_names;
821 {
822 let agent_flows = self.flows.lock().unwrap();
823 agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
824 }
825 for name in agent_flow_names {
826 self.start_agent_flow(&name).await.unwrap_or_else(|e| {
827 log::error!("Failed to start agent flow: {}", e);
828 });
829 }
830 Ok(())
831 }
832
833 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
834 let mut observers = self.observers.lock().unwrap();
835 let observer_id = new_observer_id();
836 observers.insert(observer_id, observer);
837 observer_id
838 }
839
840 pub fn unsubscribe(&self, observer_id: usize) {
841 let mut observers = self.observers.lock().unwrap();
842 observers.remove(&observer_id);
843 }
844
845 pub(crate) fn emit_error(&self, agent_id: String, message: String) {
846 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
847 }
848
849 pub(crate) fn emit_input(&self, agent_id: String, ch: String) {
850 self.notify_observers(ASKitEvent::AgentIn(agent_id, ch));
851 }
852
853 pub(crate) fn emit_display(&self, agent_id: String, key: String, data: AgentData) {
854 self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, data));
855 }
856
857 pub(crate) fn emit_board(&self, name: String, data: AgentData) {
858 self.notify_observers(ASKitEvent::Board(name, data));
859 }
860
861 fn notify_observers(&self, event: ASKitEvent) {
862 let observers = self.observers.lock().unwrap();
863 for (_id, observer) in observers.iter() {
864 observer.notify(&event);
865 }
866 }
867}
868
869#[derive(Clone, Debug)]
870pub enum ASKitEvent {
871 AgentIn(String, String), AgentDisplay(String, String, AgentData), AgentError(String, String), Board(String, AgentData), }
876
877pub trait ASKitObserver {
878 fn notify(&self, event: &ASKitEvent);
879}
880
881static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
882
883fn new_observer_id() -> usize {
884 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
885}
886
887#[derive(Clone)]
890pub enum AgentMessageSender {
891 Sync(std::sync::mpsc::Sender<AgentMessage>),
892 Async(mpsc::Sender<AgentMessage>),
893}