1use std::collections::HashMap;
2use std::sync::atomic::AtomicUsize;
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::{Mutex as AsyncMutex, mpsc};
6
7use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
8use crate::board_agent;
9use crate::config::{AgentConfig, AgentConfigs};
10use crate::context::AgentContext;
11use crate::data::AgentData;
12use crate::definition::{AgentDefaultConfig, AgentDefinition, AgentDefinitions};
13use crate::error::AgentError;
14use crate::flow::{self, AgentFlow, AgentFlowEdge, AgentFlowNode, AgentFlows};
15use crate::message::{self, AgentEventMessage};
16
17#[derive(Clone)]
18pub struct ASKit {
19 pub(crate) agents:
21 Arc<Mutex<HashMap<String, Arc<AsyncMutex<Box<dyn Agent + Send + Sync + 'static>>>>>>,
22
23 pub(crate) agent_txs: Arc<Mutex<HashMap<String, AgentMessageSender>>>,
25
26 pub(crate) board_out_agents: Arc<Mutex<HashMap<String, Vec<String>>>>,
28
29 pub(crate) board_data: Arc<Mutex<HashMap<String, AgentData>>>,
31
32 pub(crate) edges: Arc<Mutex<HashMap<String, Vec<(String, String, String)>>>>,
34
35 pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
37
38 pub(crate) flows: Arc<Mutex<AgentFlows>>,
40
41 pub(crate) global_configs: Arc<Mutex<HashMap<String, AgentConfig>>>,
43
44 pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
46
47 pub(crate) observers: Arc<Mutex<HashMap<usize, Box<dyn ASKitObserver + Sync + Send>>>>,
49}
50
51impl ASKit {
52 pub fn new() -> Self {
53 Self {
54 agents: Default::default(),
55 agent_txs: Default::default(),
56 board_out_agents: Default::default(),
57 board_data: Default::default(),
58 edges: Default::default(),
59 defs: Default::default(),
60 flows: Default::default(),
61 global_configs: Default::default(),
62 tx: Arc::new(Mutex::new(None)),
63 observers: Default::default(),
64 }
65 }
66
67 pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
68 self.tx
69 .lock()
70 .unwrap()
71 .clone()
72 .ok_or(AgentError::TxNotInitialized)
73 }
74
75 pub fn init() -> Result<Self, AgentError> {
76 let askit = Self::new();
77 askit.register_agents();
78 Ok(askit)
79 }
80
81 fn register_agents(&self) {
82 board_agent::register_agents(self);
83 }
84
85 pub async fn ready(&self) -> Result<(), AgentError> {
86 self.spawn_message_loop()?;
87 self.start_agent_flows().await?;
88 Ok(())
89 }
90
91 pub fn quit(&self) {
92 let mut tx_lock = self.tx.lock().unwrap();
93 *tx_lock = None;
94 }
95
96 pub fn register_agent(&self, def: AgentDefinition) {
97 let def_name = def.name.clone();
98 let def_global_config = def.global_config.clone();
99
100 let mut defs = self.defs.lock().unwrap();
101 defs.insert(def.name.clone(), def);
102
103 if let Some(def_global_config) = def_global_config {
105 let mut new_config = AgentConfig::default();
106 for (key, config_entry) in def_global_config.iter() {
107 new_config.set(key.clone(), config_entry.value.clone());
108 }
109 self.set_global_config(def_name, new_config);
110 }
111 }
112
113 pub fn get_agent_definitions(&self) -> AgentDefinitions {
114 let defs = self.defs.lock().unwrap();
115 defs.clone()
116 }
117
118 pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
119 let defs = self.defs.lock().unwrap();
120 defs.get(def_name).cloned()
121 }
122
123 pub fn get_agent_default_config(&self, def_name: &str) -> Option<AgentDefaultConfig> {
124 let defs = self.defs.lock().unwrap();
125 let Some(def) = defs.get(def_name) else {
126 return None;
127 };
128 def.default_config.clone()
129 }
130
131 pub fn get_agent_flows(&self) -> AgentFlows {
134 let flows = self.flows.lock().unwrap();
135 flows.clone()
136 }
137
138 pub fn new_agent_flow(&self, name: &str) -> Result<AgentFlow, AgentError> {
139 if !Self::is_valid_flow_name(name) {
140 return Err(AgentError::InvalidFlowName(name.into()));
141 }
142
143 let new_name = self.unique_flow_name(name);
144 let mut flows = self.flows.lock().unwrap();
145 let flow = AgentFlow::new(new_name.clone());
146 flows.insert(new_name, flow.clone());
147 Ok(flow)
148 }
149
150 pub fn rename_agent_flow(&self, old_name: &str, new_name: &str) -> Result<String, AgentError> {
151 if !Self::is_valid_flow_name(new_name) {
152 return Err(AgentError::InvalidFlowName(new_name.into()));
153 }
154
155 let new_name = self.unique_flow_name(new_name);
157
158 let mut flows = self.flows.lock().unwrap();
159
160 let Some(mut flow) = flows.remove(old_name) else {
162 return Err(AgentError::RenameFlowFailed(old_name.into()));
163 };
164
165 flow.set_name(new_name.clone());
167 flows.insert(new_name.clone(), flow);
168 Ok(new_name)
169 }
170
171 fn is_valid_flow_name(new_name: &str) -> bool {
172 if new_name.trim().is_empty() {
174 return false;
175 }
176
177 if new_name.contains('/') {
179 if new_name.starts_with('/') || new_name.ends_with('/') || new_name.contains("//") {
181 return false;
182 }
183 if new_name
185 .split('/')
186 .any(|segment| segment == "." || segment == "..")
187 {
188 return false;
189 }
190 }
191
192 let invalid_chars = ['\\', ':', '*', '?', '"', '<', '>', '|'];
194 for c in invalid_chars {
195 if new_name.contains(c) {
196 return false;
197 }
198 }
199
200 true
201 }
202
203 pub fn unique_flow_name(&self, name: &str) -> String {
204 let mut new_name = name.trim().to_string();
205 let mut i = 2;
206 let flows = self.flows.lock().unwrap();
207 while flows.contains_key(&new_name) {
208 new_name = format!("{}{}", name, i);
209 i += 1;
210 }
211 new_name
212 }
213
214 pub fn add_agent_flow(&self, agent_flow: &AgentFlow) -> Result<(), AgentError> {
215 let name = agent_flow.name();
216
217 {
219 let mut flows = self.flows.lock().unwrap();
220 if flows.contains_key(name) {
221 return Err(AgentError::DuplicateFlowName(name.into()));
222 }
223 flows.insert(name.into(), agent_flow.clone());
224 }
225
226 for node in agent_flow.nodes().iter() {
228 self.add_agent(name, node).unwrap_or_else(|e| {
229 log::error!("Failed to add_agent_node {}: {}", node.id, e);
230 });
231 }
232
233 for edge in agent_flow.edges().iter() {
235 self.add_edge(edge).unwrap_or_else(|e| {
236 log::error!("Failed to add_edge {}: {}", edge.source, e);
237 });
238 }
239
240 Ok(())
241 }
242
243 pub async fn remove_agent_flow(&self, flow_name: &str) -> Result<(), AgentError> {
244 let flow = {
245 let mut flows = self.flows.lock().unwrap();
246 let Some(flow) = flows.remove(flow_name) else {
247 return Err(AgentError::FlowNotFound(flow_name.to_string()));
248 };
249 flow.clone()
250 };
251
252 flow.stop(self).await?;
253
254 for node in flow.nodes() {
256 self.remove_agent(&node.id).await?;
257 }
258 for edge in flow.edges() {
259 self.remove_edge(edge);
260 }
261
262 Ok(())
263 }
264
265 pub fn insert_agent_flow(&self, flow: AgentFlow) -> Result<(), AgentError> {
266 let flow_name = flow.name();
267
268 let mut flows = self.flows.lock().unwrap();
269 flows.insert(flow_name.to_string(), flow);
270 Ok(())
271 }
272
273 pub fn add_agent_flow_node(
274 &self,
275 flow_name: &str,
276 node: &AgentFlowNode,
277 ) -> Result<(), AgentError> {
278 let mut flows = self.flows.lock().unwrap();
279 let Some(flow) = flows.get_mut(flow_name) else {
280 return Err(AgentError::FlowNotFound(flow_name.to_string()));
281 };
282 flow.add_node(node.clone());
283 self.add_agent(flow_name, node)?;
284 Ok(())
285 }
286
287 pub(crate) fn add_agent(
288 &self,
289 flow_name: &str,
290 node: &AgentFlowNode,
291 ) -> Result<(), AgentError> {
292 let mut agents = self.agents.lock().unwrap();
293 if agents.contains_key(&node.id) {
294 return Err(AgentError::AgentAlreadyExists(node.id.to_string()));
295 }
296 if let Ok(mut agent) = agent_new(
297 self.clone(),
298 node.id.clone(),
299 &node.def_name,
300 node.config.clone(),
301 ) {
302 agent.set_flow_name(flow_name.to_string());
303 agents.insert(node.id.clone(), Arc::new(AsyncMutex::new(agent)));
304 } else {
305 return Err(AgentError::AgentCreationFailed(node.id.to_string()));
306 }
307 Ok(())
308 }
309
310 pub fn add_agent_flow_edge(
311 &self,
312 flow_name: &str,
313 edge: &AgentFlowEdge,
314 ) -> Result<(), AgentError> {
315 let mut flows = self.flows.lock().unwrap();
316 let Some(flow) = flows.get_mut(flow_name) else {
317 return Err(AgentError::FlowNotFound(flow_name.to_string()));
318 };
319 flow.add_edge(edge.clone());
320 self.add_edge(edge)?;
321 Ok(())
322 }
323
324 pub(crate) fn add_edge(&self, edge: &AgentFlowEdge) -> Result<(), AgentError> {
325 {
327 let agents = self.agents.lock().unwrap();
328 if !agents.contains_key(&edge.source) {
329 return Err(AgentError::SourceAgentNotFound(edge.source.to_string()));
330 }
331 }
332
333 if edge.source_handle.is_empty() {
335 return Err(AgentError::EmptySourceHandle);
336 }
337 if edge.target_handle.is_empty() {
338 return Err(AgentError::EmptyTargetHandle);
339 }
340
341 let mut edges = self.edges.lock().unwrap();
342 if let Some(targets) = edges.get_mut(&edge.source) {
343 if targets
344 .iter()
345 .any(|(target, source_handle, target_handle)| {
346 *target == edge.target
347 && *source_handle == edge.source_handle
348 && *target_handle == edge.target_handle
349 })
350 {
351 return Err(AgentError::EdgeAlreadyExists);
352 }
353 targets.push((
354 edge.target.clone(),
355 edge.source_handle.clone(),
356 edge.target_handle.clone(),
357 ));
358 } else {
359 edges.insert(
360 edge.source.clone(),
361 vec![(
362 edge.target.clone(),
363 edge.source_handle.clone(),
364 edge.target_handle.clone(),
365 )],
366 );
367 }
368 Ok(())
369 }
370
371 pub async fn remove_agent_flow_node(
372 &self,
373 flow_name: &str,
374 node_id: &str,
375 ) -> Result<(), AgentError> {
376 {
377 let mut flows = self.flows.lock().unwrap();
378 let Some(flow) = flows.get_mut(flow_name) else {
379 return Err(AgentError::FlowNotFound(flow_name.to_string()));
380 };
381 flow.remove_node(node_id);
382 }
383 self.remove_agent(node_id).await?;
384 Ok(())
385 }
386
387 pub(crate) async fn remove_agent(&self, agent_id: &str) -> Result<(), AgentError> {
388 self.stop_agent(agent_id).await?;
389
390 {
392 let mut edges = self.edges.lock().unwrap();
393 let mut sources_to_remove = Vec::new();
394 for (source, targets) in edges.iter_mut() {
395 targets.retain(|(target, _, _)| target != agent_id);
396 if targets.is_empty() {
397 sources_to_remove.push(source.clone());
398 }
399 }
400 for source in sources_to_remove {
401 edges.remove(&source);
402 }
403 edges.remove(agent_id);
404 }
405
406 {
408 let mut agents = self.agents.lock().unwrap();
409 agents.remove(agent_id);
410 }
411
412 Ok(())
413 }
414
415 pub fn remove_agent_flow_edge(&self, flow_name: &str, edge_id: &str) -> Result<(), AgentError> {
416 let mut flows = self.flows.lock().unwrap();
417 let Some(flow) = flows.get_mut(flow_name) else {
418 return Err(AgentError::FlowNotFound(flow_name.to_string()));
419 };
420 let Some(edge) = flow.remove_edge(edge_id) else {
421 return Err(AgentError::EdgeNotFound(edge_id.to_string()));
422 };
423 self.remove_edge(&edge);
424 Ok(())
425 }
426
427 pub(crate) fn remove_edge(&self, edge: &AgentFlowEdge) {
428 let mut edges = self.edges.lock().unwrap();
429 if let Some(targets) = edges.get_mut(&edge.source) {
430 targets.retain(|(target, source_handle, target_handle)| {
431 *target != edge.target
432 || *source_handle != edge.source_handle
433 || *target_handle != edge.target_handle
434 });
435 if targets.is_empty() {
436 edges.remove(&edge.source);
437 }
438 }
439 }
440
441 pub fn copy_sub_flow(
442 &self,
443 nodes: &Vec<AgentFlowNode>,
444 edges: &Vec<AgentFlowEdge>,
445 ) -> (Vec<AgentFlowNode>, Vec<AgentFlowEdge>) {
446 flow::copy_sub_flow(nodes, edges)
447 }
448
449 pub async fn start_agent_flow(&self, name: &str) -> Result<(), AgentError> {
450 let flows = self.flows.lock().unwrap();
451 let Some(flow) = flows.get(name) else {
452 return Err(AgentError::FlowNotFound(name.to_string()));
453 };
454 flow.start(self).await?;
455 Ok(())
456 }
457
458 pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
459 let agent = {
460 let agents = self.agents.lock().unwrap();
461 let Some(a) = agents.get(agent_id) else {
462 return Err(AgentError::AgentNotFound(agent_id.to_string()));
463 };
464 a.clone()
465 };
466 let def_name = {
467 let agent = agent.lock().await;
468 agent.def_name().to_string()
469 };
470 let uses_native_thread = {
471 let defs = self.defs.lock().unwrap();
472 let Some(def) = defs.get(&def_name) else {
473 return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
474 };
475 def.native_thread
476 };
477 let agent_status = {
478 let agent = agent.lock().await;
479 agent.status().clone()
480 };
481 if agent_status == AgentStatus::Init {
482 log::info!("Starting agent {}", agent_id);
483
484 if uses_native_thread {
485 let (tx, rx) = std::sync::mpsc::channel();
486
487 {
488 let mut agent_txs = self.agent_txs.lock().unwrap();
489 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Sync(tx.clone()));
490 };
491
492 let agent_id = agent_id.to_string();
493 std::thread::spawn(async move || {
494 if let Err(e) = agent.lock().await.start() {
495 log::error!("Failed to start agent {}: {}", agent_id, e);
496 }
497
498 while let Ok(message) = rx.recv() {
499 match message {
500 AgentMessage::Input { ctx, data } => {
501 agent
502 .lock()
503 .await
504 .process(ctx, data)
505 .await
506 .unwrap_or_else(|e| {
507 log::error!("Process Error {}: {}", agent_id, e);
508 });
509 }
510 AgentMessage::Config { config } => {
511 agent.lock().await.set_config(config).unwrap_or_else(|e| {
512 log::error!("Config Error {}: {}", agent_id, e);
513 });
514 }
515 AgentMessage::Stop => {
516 break;
517 }
518 }
519 }
520 });
521 } else {
522 let (tx, mut rx) = mpsc::channel(32);
523
524 {
525 let mut agent_txs = self.agent_txs.lock().unwrap();
526 agent_txs.insert(agent_id.to_string(), AgentMessageSender::Async(tx.clone()));
527 };
528
529 let agent_id = agent_id.to_string();
530 tokio::spawn(async move {
531 {
532 let mut agent_guard = agent.lock().await;
533 if let Err(e) = agent_guard.start() {
534 log::error!("Failed to start agent {}: {}", agent_id, e);
535 }
536 }
537
538 while let Some(message) = rx.recv().await {
539 match message {
540 AgentMessage::Input { ctx, data } => {
541 agent
542 .lock()
543 .await
544 .process(ctx, data)
545 .await
546 .unwrap_or_else(|e| {
547 log::error!("Process Error {}: {}", agent_id, e);
548 });
549 }
550 AgentMessage::Config { config } => {
551 agent.lock().await.set_config(config).unwrap_or_else(|e| {
552 log::error!("Config Error {}: {}", agent_id, e);
553 });
554 }
555 AgentMessage::Stop => {
556 rx.close();
557 return;
558 }
559 }
560 }
561 });
562 }
563 }
564 Ok(())
565 }
566
567 pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
568 let agent = {
569 let agents = self.agents.lock().unwrap();
570 let Some(a) = agents.get(agent_id) else {
571 return Err(AgentError::AgentNotFound(agent_id.to_string()));
572 };
573 a.clone()
574 };
575
576 let agent_status = {
577 let agent = agent.lock().await;
578 agent.status().clone()
579 };
580 if agent_status == AgentStatus::Start {
581 log::info!("Stopping agent {}", agent_id);
582
583 {
584 let mut agent_txs = self.agent_txs.lock().unwrap();
585 if let Some(tx) = agent_txs.remove(agent_id) {
586 match tx {
587 AgentMessageSender::Sync(tx) => {
588 tx.send(AgentMessage::Stop).unwrap_or_else(|e| {
589 log::error!(
590 "Failed to send stop message to agent {}: {}",
591 agent_id,
592 e
593 );
594 });
595 }
596 AgentMessageSender::Async(tx) => {
597 tx.try_send(AgentMessage::Stop).unwrap_or_else(|e| {
598 log::error!(
599 "Failed to send stop message to agent {}: {}",
600 agent_id,
601 e
602 );
603 });
604 }
605 }
606 }
607 }
608
609 agent.lock().await.stop()?;
610 }
611
612 Ok(())
613 }
614
615 pub async fn set_agent_config(
616 &self,
617 agent_id: String,
618 config: AgentConfig,
619 ) -> Result<(), AgentError> {
620 let agent = {
621 let agents = self.agents.lock().unwrap();
622 let Some(a) = agents.get(&agent_id) else {
623 return Err(AgentError::AgentNotFound(agent_id.to_string()));
624 };
625 a.clone()
626 };
627
628 let agent_status = {
629 let agent = agent.lock().await;
630 agent.status().clone()
631 };
632 if agent_status == AgentStatus::Init {
633 agent.lock().await.set_config(config.clone())?;
634 } else if agent_status == AgentStatus::Start {
635 let tx = {
636 let agent_txs = self.agent_txs.lock().unwrap();
637 let Some(tx) = agent_txs.get(&agent_id) else {
638 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
639 };
640 tx.clone()
641 };
642 let message = AgentMessage::Config { config };
643 match tx {
644 AgentMessageSender::Sync(tx) => {
645 tx.send(message).map_err(|_| {
646 AgentError::SendMessageFailed("Failed to send config message".to_string())
647 })?;
648 }
649 AgentMessageSender::Async(tx) => {
650 tx.send(message).await.map_err(|_| {
651 AgentError::SendMessageFailed("Failed to send config message".to_string())
652 })?;
653 }
654 }
655 }
656 Ok(())
657 }
658
659 pub fn get_global_config(&self, def_name: &str) -> Option<AgentConfig> {
660 let global_configs = self.global_configs.lock().unwrap();
661 global_configs.get(def_name).cloned()
662 }
663
664 pub fn set_global_config(&self, def_name: String, config: AgentConfig) {
665 let mut global_configs = self.global_configs.lock().unwrap();
666
667 let Some(existing_config) = global_configs.get_mut(&def_name) else {
668 global_configs.insert(def_name, config);
669 return;
670 };
671
672 for (key, value) in config {
673 existing_config.set(key, value);
674 }
675 }
676
677 pub fn set_global_configs(&self, new_configs: AgentConfigs) {
678 for (agent_name, new_config) in new_configs {
679 self.set_global_config(agent_name, new_config);
680 }
681 }
682
683 pub fn get_global_configs(&self) -> AgentConfigs {
684 let global_configs = self.global_configs.lock().unwrap();
685 global_configs.clone()
686 }
687
688 pub(crate) async fn agent_input(
689 &self,
690 agent_id: String,
691 ctx: AgentContext,
692 data: AgentData,
693 ) -> Result<(), AgentError> {
694 let agent: Arc<AsyncMutex<Box<dyn Agent + Send + Sync>>> = {
695 let agents = self.agents.lock().unwrap();
696 let Some(a) = agents.get(&agent_id) else {
697 return Err(AgentError::AgentNotFound(agent_id.to_string()));
698 };
699 a.clone()
700 };
701
702 let agent_status = {
703 let agent = agent.lock().await;
704 agent.status().clone()
705 };
706 if agent_status == AgentStatus::Start {
707 let ch = ctx.port().to_string();
708 let message = AgentMessage::Input { ctx, data };
709
710 let tx = {
711 let agent_txs = self.agent_txs.lock().unwrap();
712 let Some(tx) = agent_txs.get(&agent_id) else {
713 return Err(AgentError::AgentTxNotFound(agent_id.to_string()));
714 };
715 tx.clone()
716 };
717 match tx {
718 AgentMessageSender::Sync(tx) => {
719 tx.send(message).map_err(|_| {
720 AgentError::SendMessageFailed("Failed to send input message".to_string())
721 })?;
722 }
723 AgentMessageSender::Async(tx) => {
724 tx.send(message).await.map_err(|_| {
725 AgentError::SendMessageFailed("Failed to send input message".to_string())
726 })?;
727 }
728 }
729
730 self.emit_input(agent_id.to_string(), ch);
731 }
732 Ok(())
733 }
734
735 pub async fn send_agent_out(
736 &self,
737 agent_id: String,
738 ctx: AgentContext,
739 data: AgentData,
740 ) -> Result<(), AgentError> {
741 message::send_agent_out(self, agent_id, ctx, data).await
742 }
743
744 pub fn try_send_agent_out(
745 &self,
746 agent_id: String,
747 ctx: AgentContext,
748 data: AgentData,
749 ) -> Result<(), AgentError> {
750 message::try_send_agent_out(self, agent_id, ctx, data)
751 }
752
753 pub fn write_board_data(&self, name: String, data: AgentData) -> Result<(), AgentError> {
754 self.try_send_board_out(name, AgentContext::new(), data)
755 }
756
757 pub(crate) fn try_send_board_out(
758 &self,
759 name: String,
760 ctx: AgentContext,
761 data: AgentData,
762 ) -> Result<(), AgentError> {
763 message::try_send_board_out(self, name, ctx, data)
764 }
765
766 fn spawn_message_loop(&self) -> Result<(), AgentError> {
767 let (tx, mut rx) = mpsc::channel(4096);
769 {
770 let mut tx_lock = self.tx.lock().unwrap();
771 *tx_lock = Some(tx);
772 }
773
774 let askit = self.clone();
776 tokio::spawn(async move {
777 while let Some(message) = rx.recv().await {
778 use AgentEventMessage::*;
779
780 match message {
781 AgentOut { agent, ctx, data } => {
782 message::agent_out(&askit, agent, ctx, data).await;
783 }
784 BoardOut { name, ctx, data } => {
785 message::board_out(&askit, name, ctx, data).await;
786 }
787 }
788 }
789 });
790
791 Ok(())
792 }
793
794 async fn start_agent_flows(&self) -> Result<(), AgentError> {
795 let agent_flow_names;
796 {
797 let agent_flows = self.flows.lock().unwrap();
798 agent_flow_names = agent_flows.keys().cloned().collect::<Vec<_>>();
799 }
800 for name in agent_flow_names {
801 self.start_agent_flow(&name).await.unwrap_or_else(|e| {
802 log::error!("Failed to start agent flow: {}", e);
803 });
804 }
805 Ok(())
806 }
807
808 pub fn subscribe(&self, observer: Box<dyn ASKitObserver + Sync + Send>) -> usize {
809 let mut observers = self.observers.lock().unwrap();
810 let observer_id = new_observer_id();
811 observers.insert(observer_id, observer);
812 observer_id
813 }
814
815 pub fn unsubscribe(&self, observer_id: usize) {
816 let mut observers = self.observers.lock().unwrap();
817 observers.remove(&observer_id);
818 }
819
820 pub(crate) fn emit_error(&self, agent_id: String, message: String) {
821 self.notify_observers(ASKitEvent::AgentError(agent_id, message));
822 }
823
824 pub(crate) fn emit_input(&self, agent_id: String, ch: String) {
825 self.notify_observers(ASKitEvent::AgentIn(agent_id, ch));
826 }
827
828 pub(crate) fn emit_display(&self, agent_id: String, key: String, data: AgentData) {
829 self.notify_observers(ASKitEvent::AgentDisplay(agent_id, key, data));
830 }
831
832 pub(crate) fn emit_board(&self, name: String, data: AgentData) {
833 self.notify_observers(ASKitEvent::Board(name, data));
834 }
835
836 fn notify_observers(&self, event: ASKitEvent) {
837 let observers = self.observers.lock().unwrap();
838 for (_id, observer) in observers.iter() {
839 observer.notify(&event);
840 }
841 }
842}
843
844#[derive(Clone, Debug)]
845pub enum ASKitEvent {
846 AgentIn(String, String), AgentDisplay(String, String, AgentData), AgentError(String, String), Board(String, AgentData), }
851
852pub trait ASKitObserver {
853 fn notify(&self, event: &ASKitEvent);
854}
855
856static OBSERVER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
857
858fn new_observer_id() -> usize {
859 OBSERVER_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
860}
861
862#[derive(Clone)]
865pub enum AgentMessageSender {
866 Sync(std::sync::mpsc::Sender<AgentMessage>),
867 Async(mpsc::Sender<AgentMessage>),
868}