1pub mod types;
14
15pub use types::*;
17
18use crate::llm::LLMClient;
19use async_trait::async_trait;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::sync::RwLock;
23
24fn get_mode_description(mode: &CollaborationMode) -> &'static str {
26 match mode {
27 CollaborationMode::RequestResponse => "一对一的请求-响应,适合确定性任务",
28 CollaborationMode::PublishSubscribe => "一对多的发布订阅,适合发散性任务",
29 CollaborationMode::Consensus => "多 Agent 共识决策,适合需要达成一致的场景",
30 CollaborationMode::Debate => "多轮辩论优化,适合需要迭代改进的审查任务",
31 CollaborationMode::Parallel => "并行处理独立任务,适合可分解的工作",
32 CollaborationMode::Sequential => "顺序执行有依赖的任务链",
33 CollaborationMode::Custom(_) => "自定义模式",
34 }
35}
36
37pub struct LLMProtocolHelper {
45 agent_id: String,
47 llm_client: Option<Arc<LLMClient>>,
49 use_llm: bool,
51}
52
53impl LLMProtocolHelper {
54 pub fn new(agent_id: impl Into<String>) -> Self {
56 Self {
57 agent_id: agent_id.into(),
58 llm_client: None,
59 use_llm: false,
60 }
61 }
62
63 pub fn with_llm(mut self, llm_client: Arc<LLMClient>) -> Self {
65 self.llm_client = Some(llm_client);
66 self.use_llm = true;
67 self
68 }
69
70 pub fn with_use_llm(mut self, use_llm: bool) -> Self {
72 self.use_llm = use_llm;
73 self
74 }
75
76 pub fn agent_id(&self) -> &str {
78 &self.agent_id
79 }
80
81 pub async fn process_with_llm(
83 &self,
84 msg: &CollaborationMessage,
85 system_prompt: &str,
86 ) -> anyhow::Result<CollaborationContent> {
87 let llm_client = self
88 .llm_client
89 .as_ref()
90 .ok_or_else(|| anyhow::anyhow!("LLM client not configured"))?;
91
92 let user_prompt = format!(
93 "你是 {}。收到一条协作消息:\n\n发送者: {}\n内容: {}\n\n请处理这条消息并返回响应。",
94 self.agent_id,
95 msg.sender,
96 msg.content.to_text()
97 );
98
99 let response = llm_client
100 .chat()
101 .system(system_prompt)
102 .user(&user_prompt)
103 .send()
104 .await?;
105
106 Ok(CollaborationContent::LLMResponse {
107 reasoning: "通过 LLM 分析和处理协作消息".to_string(),
108 conclusion: response.content().unwrap_or("").to_string(),
109 data: serde_json::json!({
110 "original_sender": msg.sender,
111 "original_content": msg.content.to_text(),
112 }),
113 })
114 }
115}
116
117pub struct RequestResponseProtocol {
125 helper: LLMProtocolHelper,
126 message_queue: Arc<RwLock<Vec<CollaborationMessage>>>,
127}
128
129impl RequestResponseProtocol {
130 pub fn new(agent_id: impl Into<String>) -> Self {
132 Self {
133 helper: LLMProtocolHelper::new(agent_id),
134 message_queue: Arc::new(RwLock::new(Vec::new())),
135 }
136 }
137
138 pub fn with_llm(agent_id: impl Into<String>, llm_client: Arc<LLMClient>) -> Self {
140 Self {
141 helper: LLMProtocolHelper::new(agent_id).with_llm(llm_client),
142 message_queue: Arc::new(RwLock::new(Vec::new())),
143 }
144 }
145}
146
147#[async_trait]
148impl CollaborationProtocol for RequestResponseProtocol {
149 fn name(&self) -> &str {
150 "request_response"
151 }
152
153 fn description(&self) -> &str {
154 "请求-响应协作协议:一对一通信,同步等待结果"
155 }
156
157 fn applicable_scenarios(&self) -> Vec<String> {
158 vec![
159 "数据查询和处理".to_string(),
160 "确定性任务执行".to_string(),
161 "状态获取".to_string(),
162 "简单问答".to_string(),
163 ]
164 }
165
166 fn mode(&self) -> CollaborationMode {
167 CollaborationMode::RequestResponse
168 }
169
170 async fn send_message(&self, msg: CollaborationMessage) -> anyhow::Result<()> {
171 let mut queue = self.message_queue.write().await;
172 queue.push(msg);
173 Ok(())
174 }
175
176 async fn receive_message(&self) -> anyhow::Result<Option<CollaborationMessage>> {
177 let mut queue = self.message_queue.write().await;
178 Ok(queue.pop())
179 }
180
181 async fn process_message(
182 &self,
183 msg: CollaborationMessage,
184 ) -> anyhow::Result<CollaborationResult> {
185 let start = std::time::Instant::now();
186
187 tracing::debug!(
188 "[RequestResponse] {} processing message from {}",
189 self.helper.agent_id(),
190 msg.sender
191 );
192
193 let content = if self.helper.use_llm {
195 self.helper
196 .process_with_llm(
197 &msg,
198 "你是一个协作 Agent,负责处理请求-响应模式的协作消息。请理解请求内容并提供准确的响应。",
199 )
200 .await?
201 } else {
202 CollaborationContent::Text(format!("已收到来自 {} 的请求并处理", msg.sender))
203 };
204
205 let duration = start.elapsed().as_millis() as u64;
206
207 Ok(
208 CollaborationResult::success(content, duration, CollaborationMode::RequestResponse)
209 .with_participant(self.helper.agent_id().to_string())
210 .with_participant(msg.sender),
211 )
212 }
213
214 fn stats(&self) -> HashMap<String, serde_json::Value> {
215 HashMap::from([
216 (
217 "agent_id".to_string(),
218 serde_json::json!(self.helper.agent_id()),
219 ),
220 (
221 "use_llm".to_string(),
222 serde_json::json!(self.helper.use_llm),
223 ),
224 ])
225 }
226}
227
228pub struct PublishSubscribeProtocol {
234 helper: LLMProtocolHelper,
235 message_queue: Arc<RwLock<Vec<CollaborationMessage>>>,
236 subscribed_topics: Arc<RwLock<std::collections::HashSet<String>>>,
237}
238
239impl PublishSubscribeProtocol {
240 pub fn new(agent_id: impl Into<String>) -> Self {
242 Self {
243 helper: LLMProtocolHelper::new(agent_id),
244 message_queue: Arc::new(RwLock::new(Vec::new())),
245 subscribed_topics: Arc::new(RwLock::new(std::collections::HashSet::new())),
246 }
247 }
248
249 pub fn with_llm(agent_id: impl Into<String>, llm_client: Arc<LLMClient>) -> Self {
251 Self {
252 helper: LLMProtocolHelper::new(agent_id).with_llm(llm_client),
253 message_queue: Arc::new(RwLock::new(Vec::new())),
254 subscribed_topics: Arc::new(RwLock::new(std::collections::HashSet::new())),
255 }
256 }
257
258 pub async fn subscribe(&self, topic: String) -> anyhow::Result<()> {
260 let mut subscribed = self.subscribed_topics.write().await;
261 subscribed.insert(topic.clone());
262 tracing::debug!(
263 "[PublishSubscribe] {} subscribed to topic: {}",
264 self.helper.agent_id(),
265 topic
266 );
267 Ok(())
268 }
269}
270
271#[async_trait]
272impl CollaborationProtocol for PublishSubscribeProtocol {
273 fn name(&self) -> &str {
274 "publish_subscribe"
275 }
276
277 fn description(&self) -> &str {
278 "发布-订阅协作协议:一对多通信,适合发散性任务和创意生成"
279 }
280
281 fn applicable_scenarios(&self) -> Vec<String> {
282 vec![
283 "创意生成和发散".to_string(),
284 "通知广播".to_string(),
285 "事件传播".to_string(),
286 "多人协作".to_string(),
287 ]
288 }
289
290 fn mode(&self) -> CollaborationMode {
291 CollaborationMode::PublishSubscribe
292 }
293
294 async fn send_message(&self, msg: CollaborationMessage) -> anyhow::Result<()> {
295 let mut queue = self.message_queue.write().await;
296 queue.push(msg);
297 Ok(())
298 }
299
300 async fn receive_message(&self) -> anyhow::Result<Option<CollaborationMessage>> {
301 let mut queue = self.message_queue.write().await;
302 Ok(queue.pop())
303 }
304
305 async fn process_message(
306 &self,
307 msg: CollaborationMessage,
308 ) -> anyhow::Result<CollaborationResult> {
309 let start = std::time::Instant::now();
310
311 tracing::debug!(
312 "[PublishSubscribe] {} processing message from {} on topic {:?}",
313 self.helper.agent_id(),
314 msg.sender,
315 msg.topic
316 );
317
318 let content = if self.helper.use_llm {
319 self.helper
320 .process_with_llm(
321 &msg,
322 "你是一个协作 Agent,负责处理发布-订阅模式的协作消息。消息是发布给多个订阅者的,请提供适当的响应。",
323 )
324 .await?
325 } else {
326 CollaborationContent::Text(format!("已发布消息到主题 {:?}", msg.topic))
327 };
328
329 let duration = start.elapsed().as_millis() as u64;
330
331 Ok(
332 CollaborationResult::success(content, duration, CollaborationMode::PublishSubscribe)
333 .with_participant(self.helper.agent_id().to_string())
334 .with_participant(msg.sender),
335 )
336 }
337
338 fn stats(&self) -> HashMap<String, serde_json::Value> {
339 let topics = self.subscribed_topics.blocking_read();
340 HashMap::from([
341 (
342 "agent_id".to_string(),
343 serde_json::json!(self.helper.agent_id()),
344 ),
345 (
346 "use_llm".to_string(),
347 serde_json::json!(self.helper.use_llm),
348 ),
349 (
350 "subscribed_topics".to_string(),
351 serde_json::json!(topics.len()),
352 ),
353 ])
354 }
355}
356
357pub struct ConsensusProtocol {
363 helper: LLMProtocolHelper,
364 message_queue: Arc<RwLock<Vec<CollaborationMessage>>>,
365 threshold: f32,
366}
367
368impl ConsensusProtocol {
369 pub fn new(agent_id: impl Into<String>) -> Self {
371 Self {
372 helper: LLMProtocolHelper::new(agent_id),
373 message_queue: Arc::new(RwLock::new(Vec::new())),
374 threshold: 0.7,
375 }
376 }
377
378 pub fn with_llm(agent_id: impl Into<String>, llm_client: Arc<LLMClient>) -> Self {
380 Self {
381 helper: LLMProtocolHelper::new(agent_id).with_llm(llm_client),
382 message_queue: Arc::new(RwLock::new(Vec::new())),
383 threshold: 0.7,
384 }
385 }
386}
387
388#[async_trait]
389impl CollaborationProtocol for ConsensusProtocol {
390 fn name(&self) -> &str {
391 "consensus"
392 }
393
394 fn description(&self) -> &str {
395 "共识协作协议:多 Agent 协商决策,适合需要达成一致的场景"
396 }
397
398 fn applicable_scenarios(&self) -> Vec<String> {
399 vec![
400 "决策制定".to_string(),
401 "投票评估".to_string(),
402 "方案选择".to_string(),
403 "质量评审".to_string(),
404 ]
405 }
406
407 fn mode(&self) -> CollaborationMode {
408 CollaborationMode::Consensus
409 }
410
411 async fn send_message(&self, msg: CollaborationMessage) -> anyhow::Result<()> {
412 let mut queue = self.message_queue.write().await;
413 queue.push(msg);
414 Ok(())
415 }
416
417 async fn receive_message(&self) -> anyhow::Result<Option<CollaborationMessage>> {
418 let mut queue = self.message_queue.write().await;
419 Ok(queue.pop())
420 }
421
422 async fn process_message(
423 &self,
424 msg: CollaborationMessage,
425 ) -> anyhow::Result<CollaborationResult> {
426 let start = std::time::Instant::now();
427
428 tracing::debug!(
429 "[Consensus] {} processing message from {}",
430 self.helper.agent_id(),
431 msg.sender
432 );
433
434 let content = if self.helper.use_llm {
435 self.helper
436 .process_with_llm(
437 &msg,
438 "你是一个协作 Agent,负责处理共识机制的协作消息。请分析多方意见并提供综合判断。",
439 )
440 .await?
441 } else {
442 CollaborationContent::Text(format!("已参与共识决策,阈值: {}", self.threshold))
443 };
444
445 let duration = start.elapsed().as_millis() as u64;
446
447 Ok(
448 CollaborationResult::success(content, duration, CollaborationMode::Consensus)
449 .with_participant(self.helper.agent_id().to_string())
450 .with_participant(msg.sender),
451 )
452 }
453
454 fn stats(&self) -> HashMap<String, serde_json::Value> {
455 HashMap::from([
456 (
457 "agent_id".to_string(),
458 serde_json::json!(self.helper.agent_id()),
459 ),
460 (
461 "use_llm".to_string(),
462 serde_json::json!(self.helper.use_llm),
463 ),
464 ("threshold".to_string(), serde_json::json!(self.threshold)),
465 ])
466 }
467}
468
469pub struct DebateProtocol {
475 helper: LLMProtocolHelper,
476 message_queue: Arc<RwLock<Vec<CollaborationMessage>>>,
477 max_rounds: usize,
478}
479
480impl DebateProtocol {
481 pub fn new(agent_id: impl Into<String>) -> Self {
483 Self {
484 helper: LLMProtocolHelper::new(agent_id),
485 message_queue: Arc::new(RwLock::new(Vec::new())),
486 max_rounds: 3,
487 }
488 }
489
490 pub fn with_llm(agent_id: impl Into<String>, llm_client: Arc<LLMClient>) -> Self {
492 Self {
493 helper: LLMProtocolHelper::new(agent_id).with_llm(llm_client),
494 message_queue: Arc::new(RwLock::new(Vec::new())),
495 max_rounds: 3,
496 }
497 }
498}
499
500#[async_trait]
501impl CollaborationProtocol for DebateProtocol {
502 fn name(&self) -> &str {
503 "debate"
504 }
505
506 fn description(&self) -> &str {
507 "辩论协作协议:多轮讨论优化,适合需要迭代改进的审查任务"
508 }
509
510 fn applicable_scenarios(&self) -> Vec<String> {
511 vec![
512 "代码审查".to_string(),
513 "方案优化".to_string(),
514 "争议解决".to_string(),
515 "质量改进".to_string(),
516 ]
517 }
518
519 fn mode(&self) -> CollaborationMode {
520 CollaborationMode::Debate
521 }
522
523 async fn send_message(&self, msg: CollaborationMessage) -> anyhow::Result<()> {
524 let mut queue = self.message_queue.write().await;
525 queue.push(msg);
526 Ok(())
527 }
528
529 async fn receive_message(&self) -> anyhow::Result<Option<CollaborationMessage>> {
530 let mut queue = self.message_queue.write().await;
531 Ok(queue.pop())
532 }
533
534 async fn process_message(
535 &self,
536 msg: CollaborationMessage,
537 ) -> anyhow::Result<CollaborationResult> {
538 let start = std::time::Instant::now();
539
540 tracing::debug!(
541 "[Debate] {} processing message from {}",
542 self.helper.agent_id(),
543 msg.sender
544 );
545
546 let content = if self.helper.use_llm {
547 self.helper
548 .process_with_llm(
549 &msg,
550 "你是一个协作 Agent,负责处理辩论模式的协作消息。请提供有建设性的观点和反驳。",
551 )
552 .await?
553 } else {
554 CollaborationContent::Text(format!("已参与辩论,最大轮数: {}", self.max_rounds))
555 };
556
557 let duration = start.elapsed().as_millis() as u64;
558
559 Ok(
560 CollaborationResult::success(content, duration, CollaborationMode::Debate)
561 .with_participant(self.helper.agent_id().to_string())
562 .with_participant(msg.sender),
563 )
564 }
565
566 fn stats(&self) -> HashMap<String, serde_json::Value> {
567 HashMap::from([
568 (
569 "agent_id".to_string(),
570 serde_json::json!(self.helper.agent_id()),
571 ),
572 (
573 "use_llm".to_string(),
574 serde_json::json!(self.helper.use_llm),
575 ),
576 ("max_rounds".to_string(), serde_json::json!(self.max_rounds)),
577 ])
578 }
579}
580
581pub struct ParallelProtocol {
587 helper: LLMProtocolHelper,
588 message_queue: Arc<RwLock<Vec<CollaborationMessage>>>,
589 max_workers: usize,
590}
591
592impl ParallelProtocol {
593 pub fn new(agent_id: impl Into<String>) -> Self {
595 Self {
596 helper: LLMProtocolHelper::new(agent_id),
597 message_queue: Arc::new(RwLock::new(Vec::new())),
598 max_workers: 4,
599 }
600 }
601
602 pub fn with_llm(agent_id: impl Into<String>, llm_client: Arc<LLMClient>) -> Self {
604 Self {
605 helper: LLMProtocolHelper::new(agent_id).with_llm(llm_client),
606 message_queue: Arc::new(RwLock::new(Vec::new())),
607 max_workers: 4,
608 }
609 }
610}
611
612#[async_trait]
613impl CollaborationProtocol for ParallelProtocol {
614 fn name(&self) -> &str {
615 "parallel"
616 }
617
618 fn description(&self) -> &str {
619 "并行协作协议:同时执行独立任务,适合可分解的工作"
620 }
621
622 fn applicable_scenarios(&self) -> Vec<String> {
623 vec![
624 "数据分析".to_string(),
625 "批量处理".to_string(),
626 "分布式搜索".to_string(),
627 "并行计算".to_string(),
628 ]
629 }
630
631 fn mode(&self) -> CollaborationMode {
632 CollaborationMode::Parallel
633 }
634
635 async fn send_message(&self, msg: CollaborationMessage) -> anyhow::Result<()> {
636 let mut queue = self.message_queue.write().await;
637 queue.push(msg);
638 Ok(())
639 }
640
641 async fn receive_message(&self) -> anyhow::Result<Option<CollaborationMessage>> {
642 let mut queue = self.message_queue.write().await;
643 Ok(queue.pop())
644 }
645
646 async fn process_message(
647 &self,
648 msg: CollaborationMessage,
649 ) -> anyhow::Result<CollaborationResult> {
650 let start = std::time::Instant::now();
651
652 tracing::debug!(
653 "[Parallel] {} processing message from {}",
654 self.helper.agent_id(),
655 msg.sender
656 );
657
658 let content = if self.helper.use_llm {
659 self.helper
660 .process_with_llm(
661 &msg,
662 "你是一个协作 Agent,负责处理并行模式的协作消息。任务会被分解并并行处理。",
663 )
664 .await?
665 } else {
666 CollaborationContent::Text(format!(
667 "已启动并行处理,最大工作线程: {}",
668 self.max_workers
669 ))
670 };
671
672 let duration = start.elapsed().as_millis() as u64;
673
674 Ok(
675 CollaborationResult::success(content, duration, CollaborationMode::Parallel)
676 .with_participant(self.helper.agent_id().to_string())
677 .with_participant(msg.sender),
678 )
679 }
680
681 fn stats(&self) -> HashMap<String, serde_json::Value> {
682 HashMap::from([
683 (
684 "agent_id".to_string(),
685 serde_json::json!(self.helper.agent_id()),
686 ),
687 (
688 "use_llm".to_string(),
689 serde_json::json!(self.helper.use_llm),
690 ),
691 (
692 "max_workers".to_string(),
693 serde_json::json!(self.max_workers),
694 ),
695 ])
696 }
697}
698
699pub struct SequentialProtocol {
705 helper: LLMProtocolHelper,
706 message_queue: Arc<RwLock<Vec<CollaborationMessage>>>,
707}
708
709impl SequentialProtocol {
710 pub fn new(agent_id: impl Into<String>) -> Self {
712 Self {
713 helper: LLMProtocolHelper::new(agent_id),
714 message_queue: Arc::new(RwLock::new(Vec::new())),
715 }
716 }
717
718 pub fn with_llm(agent_id: impl Into<String>, llm_client: Arc<LLMClient>) -> Self {
720 Self {
721 helper: LLMProtocolHelper::new(agent_id).with_llm(llm_client),
722 message_queue: Arc::new(RwLock::new(Vec::new())),
723 }
724 }
725}
726
727#[async_trait]
728impl CollaborationProtocol for SequentialProtocol {
729 fn name(&self) -> &str {
730 "sequential"
731 }
732
733 fn description(&self) -> &str {
734 "顺序协作协议:串行执行有依赖的任务链,适合流水线处理"
735 }
736
737 fn applicable_scenarios(&self) -> Vec<String> {
738 vec![
739 "流水线处理".to_string(),
740 "依赖任务链".to_string(),
741 "分步执行".to_string(),
742 "阶段式工作流".to_string(),
743 ]
744 }
745
746 fn mode(&self) -> CollaborationMode {
747 CollaborationMode::Sequential
748 }
749
750 async fn send_message(&self, msg: CollaborationMessage) -> anyhow::Result<()> {
751 let mut queue = self.message_queue.write().await;
752 queue.push(msg);
753 Ok(())
754 }
755
756 async fn receive_message(&self) -> anyhow::Result<Option<CollaborationMessage>> {
757 let mut queue = self.message_queue.write().await;
758 Ok(queue.pop())
759 }
760
761 async fn process_message(
762 &self,
763 msg: CollaborationMessage,
764 ) -> anyhow::Result<CollaborationResult> {
765 let start = std::time::Instant::now();
766
767 tracing::debug!(
768 "[Sequential] {} processing message from {}",
769 self.helper.agent_id(),
770 msg.sender
771 );
772
773 let content = if self.helper.use_llm {
774 self.helper
775 .process_with_llm(
776 &msg,
777 "你是一个协作 Agent,负责处理顺序模式的协作消息。任务会按依赖关系依次执行。",
778 )
779 .await?
780 } else {
781 CollaborationContent::Text("已执行顺序任务链".to_string())
782 };
783
784 let duration = start.elapsed().as_millis() as u64;
785
786 Ok(
787 CollaborationResult::success(content, duration, CollaborationMode::Sequential)
788 .with_participant(self.helper.agent_id().to_string())
789 .with_participant(msg.sender),
790 )
791 }
792
793 fn stats(&self) -> HashMap<String, serde_json::Value> {
794 HashMap::from([
795 (
796 "agent_id".to_string(),
797 serde_json::json!(self.helper.agent_id()),
798 ),
799 (
800 "use_llm".to_string(),
801 serde_json::json!(self.helper.use_llm),
802 ),
803 ])
804 }
805}