1use std::future::Future;
37use std::pin::Pin;
38use std::sync::Arc;
39
40use std::collections::HashMap;
41
42use swarm_engine_core::actions::ActionDef;
43use swarm_engine_core::agent::{BatchDecisionRequest, DecisionResponse, WorkerDecisionRequest};
44use swarm_engine_core::exploration::DependencyGraph;
45use swarm_engine_core::types::{LoraConfig, WorkerId};
46
47use crate::decider::{LlmDecider, LlmError};
48
49pub type BatchProcessResult = Vec<(WorkerId, Result<DecisionResponse, BatchProcessError>)>;
55
56#[derive(Debug, Clone, thiserror::Error)]
58pub enum BatchProcessError {
59 #[error("Batch process error (transient): {0}")]
61 Transient(String),
62
63 #[error("Batch process error: {0}")]
65 Permanent(String),
66}
67
68impl BatchProcessError {
69 pub fn transient(message: impl Into<String>) -> Self {
70 Self::Transient(message.into())
71 }
72
73 pub fn permanent(message: impl Into<String>) -> Self {
74 Self::Permanent(message.into())
75 }
76
77 pub fn is_transient(&self) -> bool {
78 matches!(self, Self::Transient(_))
79 }
80
81 pub fn message(&self) -> &str {
82 match self {
83 Self::Transient(msg) => msg,
84 Self::Permanent(msg) => msg,
85 }
86 }
87}
88
89impl From<LlmError> for BatchProcessError {
90 fn from(e: LlmError) -> Self {
91 if e.is_transient() {
92 Self::Transient(e.message().to_string())
93 } else {
94 Self::Permanent(e.message().to_string())
95 }
96 }
97}
98
99impl From<swarm_engine_core::error::SwarmError> for BatchProcessError {
100 fn from(err: swarm_engine_core::error::SwarmError) -> Self {
101 if err.is_transient() {
102 Self::Transient(err.message())
103 } else {
104 Self::Permanent(err.message())
105 }
106 }
107}
108
109impl From<BatchProcessError> for swarm_engine_core::error::SwarmError {
110 fn from(err: BatchProcessError) -> Self {
111 match err {
112 BatchProcessError::Transient(message) => {
113 swarm_engine_core::error::SwarmError::LlmTransient { message }
114 }
115 BatchProcessError::Permanent(message) => {
116 swarm_engine_core::error::SwarmError::LlmPermanent { message }
117 }
118 }
119 }
120}
121
122pub trait BatchProcessor: Send + Sync {
127 fn process(
135 &self,
136 request: BatchDecisionRequest,
137 ) -> Pin<Box<dyn Future<Output = BatchProcessResult> + Send + '_>>;
138
139 fn plan_dependencies(
147 &self,
148 _task: &str,
149 _actions: &[ActionDef],
150 ) -> Pin<Box<dyn Future<Output = Option<DependencyGraph>> + Send + '_>> {
151 Box::pin(async { None })
152 }
153
154 fn is_healthy(&self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>>;
156
157 fn name(&self) -> &str;
159}
160
161#[derive(Debug, Clone)]
167pub struct LlmBatchProcessorConfig {
168 pub parallel: bool,
170 pub max_concurrency: usize,
172 pub max_retries: Option<usize>,
174}
175
176impl Default for LlmBatchProcessorConfig {
177 fn default() -> Self {
178 Self {
179 parallel: true,
180 max_concurrency: 4,
181 max_retries: Some(5),
182 }
183 }
184}
185
186pub struct LlmBatchProcessor<D: LlmDecider> {
191 decider: Arc<D>,
192 config: LlmBatchProcessorConfig,
193}
194
195impl<D: LlmDecider> LlmBatchProcessor<D> {
196 pub fn new(decider: D) -> Self {
198 Self {
199 decider: Arc::new(decider),
200 config: LlmBatchProcessorConfig::default(),
201 }
202 }
203
204 pub fn from_arc(decider: Arc<D>) -> Self {
206 Self {
207 decider,
208 config: LlmBatchProcessorConfig::default(),
209 }
210 }
211
212 pub fn with_config(mut self, config: LlmBatchProcessorConfig) -> Self {
214 self.config = config;
215 self
216 }
217}
218
219impl<D: LlmDecider + 'static> BatchProcessor for LlmBatchProcessor<D> {
220 fn process(
221 &self,
222 request: BatchDecisionRequest,
223 ) -> Pin<Box<dyn Future<Output = BatchProcessResult> + Send + '_>> {
224 Box::pin(async move {
225 if request.requests.is_empty() {
226 return vec![];
227 }
228
229 let requests: Vec<(WorkerId, WorkerDecisionRequest)> = request
231 .requests
232 .into_iter()
233 .map(|r| (r.worker_id, r))
234 .collect();
235
236 if self.config.parallel {
237 self.process_parallel(requests).await
238 } else {
239 self.process_sequential(requests).await
240 }
241 })
242 }
243
244 fn plan_dependencies(
245 &self,
246 task: &str,
247 actions: &[ActionDef],
248 ) -> Pin<Box<dyn Future<Output = Option<DependencyGraph>> + Send + '_>> {
249 let task = task.to_string();
250 let actions: Vec<ActionDef> = actions.to_vec();
251 let decider = Arc::clone(&self.decider);
252
253 Box::pin(async move {
254 use swarm_engine_core::actions::ActionCategory;
255 use swarm_engine_core::exploration::DependencyGraphBuilder;
256
257 let action_names: Vec<String> = actions.iter().map(|a| a.name.clone()).collect();
258
259 let discover: Vec<&ActionDef> = actions
261 .iter()
262 .filter(|a| a.category == ActionCategory::NodeExpand)
263 .collect();
264 let not_discover: Vec<&ActionDef> = actions
265 .iter()
266 .filter(|a| a.category == ActionCategory::NodeStateChange)
267 .collect();
268
269 tracing::debug!(
270 discover = ?discover.iter().map(|a| &a.name).collect::<Vec<_>>(),
271 not_discover = ?not_discover.iter().map(|a| &a.name).collect::<Vec<_>>(),
272 "Separated actions by category"
273 );
274
275 let sorted_discover = if discover.len() <= 1 {
277 discover.iter().map(|a| a.name.clone()).collect()
278 } else {
279 binary_sort_actions(&task, &discover, decider.as_ref()).await
280 };
281
282 tracing::debug!(
283 sorted = ?sorted_discover,
284 "Sorted Discover actions via binary comparison"
285 );
286
287 let sorted_not_discover = if not_discover.len() <= 1 {
289 not_discover.iter().map(|a| a.name.clone()).collect()
290 } else {
291 binary_sort_actions(&task, ¬_discover, decider.as_ref()).await
292 };
293
294 tracing::debug!(
295 sorted = ?sorted_not_discover,
296 "Sorted NotDiscover actions via binary comparison"
297 );
298
299 let mut builder = DependencyGraphBuilder::new()
301 .task(&task)
302 .available_actions(action_names.clone());
303
304 if !sorted_discover.is_empty() {
306 builder = builder.start_node(&sorted_discover[0]);
307 } else if !sorted_not_discover.is_empty() {
308 builder = builder.start_node(&sorted_not_discover[0]);
310 }
311
312 if let Some(last) = sorted_not_discover.last() {
314 builder = builder.terminal_node(last);
315 } else if !sorted_discover.is_empty() {
316 builder = builder.terminal_node(sorted_discover.last().unwrap());
318 }
319
320 for window in sorted_discover.windows(2) {
322 builder = builder.edge(&window[0], &window[1], 0.9);
323 }
324
325 if !sorted_discover.is_empty() && !sorted_not_discover.is_empty() {
327 builder = builder.edge(
328 sorted_discover.last().unwrap(),
329 &sorted_not_discover[0],
330 0.9,
331 );
332 }
333
334 for window in sorted_not_discover.windows(2) {
336 builder = builder.edge(&window[0], &window[1], 0.9);
337 }
338
339 let graph = builder.build();
340
341 tracing::info!(
342 discover_order = ?sorted_discover,
343 not_discover_order = ?sorted_not_discover,
344 edges = graph.edges().len(),
345 "DependencyGraph generated via binary sort"
346 );
347
348 Some(graph)
349 })
350 }
351
352 fn is_healthy(&self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>> {
353 let decider = Arc::clone(&self.decider);
354 Box::pin(async move { decider.is_healthy().await })
355 }
356
357 fn name(&self) -> &str {
358 self.decider.model_name()
359 }
360}
361
362impl<D: LlmDecider + 'static> LlmBatchProcessor<D> {
363 async fn process_parallel(
380 &self,
381 requests: Vec<(WorkerId, WorkerDecisionRequest)>,
382 ) -> BatchProcessResult {
383 let grouped = group_by_lora(requests);
385
386 let group_count = grouped.len();
387 if group_count > 1 {
388 tracing::debug!(
389 groups = group_count,
390 "Processing requests in {} LoRA groups",
391 group_count
392 );
393 }
394
395 let mut all_results = Vec::new();
397 for (lora_config, group_requests) in grouped {
398 if group_count > 1 {
399 tracing::trace!(
400 lora = ?lora_config,
401 count = group_requests.len(),
402 "Processing LoRA group"
403 );
404 }
405 let results = self.process_group(group_requests).await;
406 all_results.extend(results);
407 }
408
409 all_results
410 }
411
412 async fn process_group(
414 &self,
415 requests: Vec<(WorkerId, WorkerDecisionRequest)>,
416 ) -> BatchProcessResult {
417 use futures::future::join_all;
418 use tokio::sync::Semaphore;
419
420 let max_concurrency = self
422 .decider
423 .max_concurrency()
424 .await
425 .unwrap_or(self.config.max_concurrency);
426
427 let semaphore = Arc::new(Semaphore::new(max_concurrency));
428
429 let futures: Vec<_> = requests
430 .into_iter()
431 .map(|(worker_id, req)| {
432 let decider = Arc::clone(&self.decider);
433 let sem = Arc::clone(&semaphore);
434 async move {
435 let _permit = sem.acquire().await.expect("Semaphore closed");
437 let result = decider.decide(req).await;
438 (worker_id, result)
439 }
440 })
441 .collect();
442
443 let results = join_all(futures).await;
444
445 results
446 .into_iter()
447 .map(|(worker_id, result)| {
448 let mapped = result.map_err(BatchProcessError::from);
449 (worker_id, mapped)
450 })
451 .collect()
452 }
453
454 async fn process_sequential(
456 &self,
457 requests: Vec<(WorkerId, WorkerDecisionRequest)>,
458 ) -> BatchProcessResult {
459 let mut results = Vec::with_capacity(requests.len());
460
461 for (worker_id, req) in requests {
462 let result = self.decider.decide(req).await;
463 let mapped = result.map_err(BatchProcessError::from);
464 results.push((worker_id, mapped));
465 }
466
467 results
468 }
469}
470
471fn group_by_lora(
476 requests: Vec<(WorkerId, WorkerDecisionRequest)>,
477) -> HashMap<Option<LoraConfig>, Vec<(WorkerId, WorkerDecisionRequest)>> {
478 let mut groups: HashMap<Option<LoraConfig>, Vec<(WorkerId, WorkerDecisionRequest)>> =
479 HashMap::new();
480
481 for (worker_id, req) in requests {
482 let lora_key = req.lora.clone();
483 groups.entry(lora_key).or_default().push((worker_id, req));
484 }
485
486 groups
487}
488
489async fn binary_sort_actions<D: LlmDecider>(
498 task: &str,
499 actions: &[&ActionDef],
500 decider: &D,
501) -> Vec<String> {
502 use futures::future::join_all;
503 use std::collections::HashMap;
504
505 if actions.len() <= 1 {
506 return actions.iter().map(|a| a.name.clone()).collect();
507 }
508
509 let mut requests: Vec<(usize, usize, String, String, String)> = Vec::new();
512 let mut pair_index = 0;
513
514 for i in 0..actions.len() {
515 for j in (i + 1)..actions.len() {
516 let a = actions[i];
517 let b = actions[j];
518 let prompt = format!(
519 "Goal: {}\n- {}: {}\n- {}: {}\nWhich comes first: {} or {}?\nAnswer (one word):",
520 task, a.name, a.description, b.name, b.description, a.name, b.name
521 );
522
523 for vote_idx in 0..3 {
525 requests.push((
526 pair_index,
527 vote_idx,
528 prompt.clone(),
529 a.name.clone(),
530 b.name.clone(),
531 ));
532 }
533 pair_index += 1;
534 }
535 }
536
537 let total_requests = requests.len();
538 tracing::debug!(
539 pairs = pair_index,
540 total_requests = total_requests,
541 "Binary sort: sending batch requests"
542 );
543
544 let futures: Vec<_> = requests
547 .into_iter()
548 .map(|(pair_idx, vote_idx, prompt, a_name, b_name)| {
549 let decider_ref = decider;
550 async move {
551 let result = decider_ref.call_raw(&prompt, None).await;
552 (pair_idx, vote_idx, result, a_name, b_name)
553 }
554 })
555 .collect();
556
557 let results = join_all(futures).await;
558
559 let mut pair_votes: HashMap<usize, (usize, usize, String, String)> = HashMap::new();
562
563 for (pair_idx, _vote_idx, result, a_name, b_name) in results {
564 let entry = pair_votes
565 .entry(pair_idx)
566 .or_insert((0, 0, a_name.clone(), b_name.clone()));
567
568 if let Ok(response) = result {
569 let response_upper = response.to_uppercase();
570 let a_upper = a_name.to_uppercase();
571 let b_upper = b_name.to_uppercase();
572
573 if response_upper.contains(&a_upper) {
574 entry.0 += 1;
575 } else if response_upper.contains(&b_upper) {
576 entry.1 += 1;
577 }
578 }
579 }
580
581 let mut wins: HashMap<String, usize> = HashMap::new();
583 for a in actions {
584 wins.insert(a.name.clone(), 0);
585 }
586
587 for (_pair_idx, (a_count, b_count, a_name, b_name)) in pair_votes {
588 if a_count >= b_count {
590 *wins.get_mut(&b_name).unwrap() += 1;
592 } else {
593 *wins.get_mut(&a_name).unwrap() += 1;
595 }
596 }
597
598 let mut sorted: Vec<_> = wins.into_iter().collect();
600 sorted.sort_by_key(|(_, count)| *count);
601
602 tracing::debug!(
603 sorted = ?sorted.iter().map(|(n, c)| format!("{}:{}", n, c)).collect::<Vec<_>>(),
604 "Binary sort completed"
605 );
606
607 sorted.into_iter().map(|(name, _)| name).collect()
608}
609
610#[cfg(test)]
615mod tests {
616 use super::*;
617
618 #[test]
619 fn test_batch_process_error_transient() {
620 let err = BatchProcessError::transient("connection timeout");
621 assert!(err.is_transient());
622 assert_eq!(err.message(), "connection timeout");
623 }
624
625 #[test]
626 fn test_batch_process_error_permanent() {
627 let err = BatchProcessError::permanent("invalid model");
628 assert!(!err.is_transient());
629 assert_eq!(err.message(), "invalid model");
630 }
631
632 #[test]
633 fn test_batch_process_error_from_llm_error() {
634 let llm_err = LlmError::transient("timeout");
635 let batch_err: BatchProcessError = llm_err.into();
636 assert!(batch_err.is_transient());
637 assert_eq!(batch_err.message(), "timeout");
638 }
639
640 #[test]
641 fn test_ollama_batch_processor_config_default() {
642 let config = LlmBatchProcessorConfig::default();
643 assert!(config.parallel);
644 assert_eq!(config.max_concurrency, 4);
645 }
646
647 use std::collections::HashMap;
652
653 fn binary_sort_sync(
656 actions: &[&str],
657 comparator: impl Fn(&str, &str) -> String,
659 ) -> Vec<String> {
660 if actions.len() <= 1 {
661 return actions.iter().map(|s| s.to_string()).collect();
662 }
663
664 let mut wins: HashMap<String, usize> = HashMap::new();
665 for &a in actions {
666 wins.insert(a.to_string(), 0);
667 }
668
669 for i in 0..actions.len() {
670 for j in (i + 1)..actions.len() {
671 let a = actions[i];
672 let b = actions[j];
673 let winner = comparator(a, b);
674
675 if winner == a {
677 *wins.get_mut(b).unwrap() += 1;
678 } else {
679 *wins.get_mut(a).unwrap() += 1;
680 }
681 }
682 }
683
684 let mut sorted: Vec<_> = wins.into_iter().collect();
685 sorted.sort_by_key(|(_, count)| *count);
686 sorted.into_iter().map(|(name, _)| name).collect()
687 }
688
689 #[test]
690 fn test_binary_sort_two_actions() {
691 let result = binary_sort_sync(
693 &["Fetch", "Summarize"],
694 |a, _b| a.to_string(), );
696 assert_eq!(result, vec!["Fetch", "Summarize"]);
697
698 let result = binary_sort_sync(
700 &["Fetch", "Summarize"],
701 |_a, b| b.to_string(), );
703 assert_eq!(result, vec!["Summarize", "Fetch"]);
704 }
705
706 #[test]
707 fn test_binary_sort_three_actions() {
708 let result = binary_sort_sync(&["Test", "Deploy", "Build"], |a, b| {
711 let order = ["Build", "Test", "Deploy"];
712 let a_idx = order.iter().position(|&x| x == a).unwrap();
713 let b_idx = order.iter().position(|&x| x == b).unwrap();
714 if a_idx < b_idx {
715 a.to_string()
716 } else {
717 b.to_string()
718 }
719 });
720 assert_eq!(result, vec!["Build", "Test", "Deploy"]);
721 }
722
723 #[test]
724 fn test_binary_sort_wins_calculation() {
725 let mut wins: HashMap<String, usize> = HashMap::new();
735 wins.insert("A".to_string(), 0);
736 wins.insert("B".to_string(), 0);
737 wins.insert("C".to_string(), 0);
738
739 *wins.get_mut("B").unwrap() += 1;
741 *wins.get_mut("C").unwrap() += 1;
743 *wins.get_mut("C").unwrap() += 1;
745
746 assert_eq!(wins["A"], 0);
747 assert_eq!(wins["B"], 1);
748 assert_eq!(wins["C"], 2);
749
750 let mut sorted: Vec<_> = wins.into_iter().collect();
751 sorted.sort_by_key(|(_, count)| *count);
752 let result: Vec<_> = sorted.into_iter().map(|(name, _)| name).collect();
753
754 assert_eq!(result, vec!["A", "B", "C"]);
755 }
756
757 fn extract_winner(response: &str, a: &str, b: &str) -> Option<String> {
759 let response_upper = response.to_uppercase();
760 let a_upper = a.to_uppercase();
761 let b_upper = b.to_uppercase();
762
763 if response_upper.contains(&a_upper) {
764 Some(a.to_string())
765 } else if response_upper.contains(&b_upper) {
766 Some(b.to_string())
767 } else {
768 None
769 }
770 }
771
772 #[test]
773 fn test_extract_winner() {
774 assert_eq!(
776 extract_winner("Fetch", "Fetch", "Summarize"),
777 Some("Fetch".to_string())
778 );
779 assert_eq!(
780 extract_winner("Summarize", "Fetch", "Summarize"),
781 Some("Summarize".to_string())
782 );
783
784 assert_eq!(
786 extract_winner(" Fetch", "Fetch", "Summarize"),
787 Some("Fetch".to_string())
788 );
789
790 assert_eq!(
792 extract_winner("fetch", "Fetch", "Summarize"),
793 Some("Fetch".to_string())
794 );
795 assert_eq!(
796 extract_winner("FETCH", "Fetch", "Summarize"),
797 Some("Fetch".to_string())
798 );
799
800 assert_eq!(
802 extract_winner("The answer is Fetch.", "Fetch", "Summarize"),
803 Some("Fetch".to_string())
804 );
805
806 assert_eq!(extract_winner("Unknown", "Fetch", "Summarize"), None);
808
809 assert_eq!(
811 extract_winner("Fetch then Summarize", "Fetch", "Summarize"),
812 Some("Fetch".to_string())
813 );
814 }
815
816 #[test]
817 fn test_vote_majority() {
818 fn vote_majority(responses: &[&str], a: &str, b: &str) -> String {
820 let mut a_count = 0;
821 let mut b_count = 0;
822
823 for response in responses {
824 if let Some(winner) = extract_winner(response, a, b) {
825 if winner == a {
826 a_count += 1;
827 } else {
828 b_count += 1;
829 }
830 }
831 }
832
833 if a_count >= b_count {
834 a.to_string()
835 } else {
836 b.to_string()
837 }
838 }
839
840 assert_eq!(
842 vote_majority(&["Fetch", "Fetch", "Fetch"], "Fetch", "Summarize"),
843 "Fetch"
844 );
845
846 assert_eq!(
848 vote_majority(&["Fetch", "Summarize", "Fetch"], "Fetch", "Summarize"),
849 "Fetch"
850 );
851
852 assert_eq!(
854 vote_majority(&["Summarize", "Summarize", "Fetch"], "Fetch", "Summarize"),
855 "Summarize"
856 );
857
858 assert_eq!(
860 vote_majority(&["Fetch", "Summarize", "Unknown"], "Fetch", "Summarize"),
861 "Fetch"
862 );
863 }
864
865 use swarm_engine_core::context::{ContextTarget, GlobalContext, ResolvedContext};
870
871 fn create_test_request(
872 worker_id: usize,
873 lora: Option<LoraConfig>,
874 ) -> (WorkerId, WorkerDecisionRequest) {
875 let global = GlobalContext {
876 tick: 0,
877 max_ticks: 100,
878 progress: 0.0,
879 success_rate: 0.0,
880 task_description: Some("test".to_string()),
881 hint: None,
882 };
883 let context = ResolvedContext::new(global, ContextTarget::Worker(WorkerId(worker_id)));
884
885 (
886 WorkerId(worker_id),
887 WorkerDecisionRequest {
888 worker_id: WorkerId(worker_id),
889 query: format!("query_{}", worker_id),
890 context,
891 lora,
892 },
893 )
894 }
895
896 #[test]
897 fn test_group_by_lora_single_group_no_lora() {
898 let requests = vec![
899 create_test_request(0, None),
900 create_test_request(1, None),
901 create_test_request(2, None),
902 ];
903
904 let groups = group_by_lora(requests);
905
906 assert_eq!(groups.len(), 1);
907 assert!(groups.contains_key(&None));
908 assert_eq!(groups[&None].len(), 3);
909 }
910
911 #[test]
912 fn test_group_by_lora_single_group_with_lora() {
913 let lora = LoraConfig::with_id(0);
914 let requests = vec![
915 create_test_request(0, Some(lora.clone())),
916 create_test_request(1, Some(lora.clone())),
917 ];
918
919 let groups = group_by_lora(requests);
920
921 assert_eq!(groups.len(), 1);
922 assert!(groups.contains_key(&Some(lora)));
923 }
924
925 #[test]
926 fn test_group_by_lora_multiple_groups() {
927 let lora_a = LoraConfig::with_id(0);
928 let lora_b = LoraConfig::with_id(1);
929
930 let requests = vec![
931 create_test_request(0, Some(lora_a.clone())),
932 create_test_request(1, Some(lora_b.clone())),
933 create_test_request(2, Some(lora_a.clone())),
934 create_test_request(3, None),
935 create_test_request(4, Some(lora_b.clone())),
936 ];
937
938 let groups = group_by_lora(requests);
939
940 assert_eq!(groups.len(), 3);
941 assert_eq!(groups[&Some(lora_a)].len(), 2);
942 assert_eq!(groups[&Some(lora_b)].len(), 2);
943 assert_eq!(groups[&None].len(), 1);
944 }
945
946 #[test]
947 fn test_group_by_lora_preserves_order_within_group() {
948 let lora = LoraConfig::with_id(0);
949 let requests = vec![
950 create_test_request(5, Some(lora.clone())),
951 create_test_request(3, Some(lora.clone())),
952 create_test_request(7, Some(lora.clone())),
953 ];
954
955 let groups = group_by_lora(requests);
956 let group = &groups[&Some(lora)];
957
958 assert_eq!(group[0].0, WorkerId(5));
960 assert_eq!(group[1].0, WorkerId(3));
961 assert_eq!(group[2].0, WorkerId(7));
962 }
963
964 #[test]
965 fn test_group_by_lora_different_scales() {
966 let lora_full = LoraConfig::new(0, 1.0);
968 let lora_half = LoraConfig::new(0, 0.5);
969
970 let requests = vec![
971 create_test_request(0, Some(lora_full.clone())),
972 create_test_request(1, Some(lora_half.clone())),
973 create_test_request(2, Some(lora_full.clone())),
974 ];
975
976 let groups = group_by_lora(requests);
977
978 assert_eq!(groups.len(), 2);
979 assert_eq!(groups[&Some(lora_full)].len(), 2);
980 assert_eq!(groups[&Some(lora_half)].len(), 1);
981 }
982
983 #[test]
984 fn test_group_by_lora_empty() {
985 let requests: Vec<(WorkerId, WorkerDecisionRequest)> = vec![];
986 let groups = group_by_lora(requests);
987 assert!(groups.is_empty());
988 }
989}