celers_canvas/group.rs
1use crate::{CanvasError, Signature};
2use celers_core::{Broker, SerializedTask};
3use serde::{Deserialize, Serialize};
4use uuid::Uuid;
5
6/// Group: Parallel execution
7///
8/// (task1 | task2 | task3)
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10pub struct Group {
11 /// Tasks in the group
12 pub tasks: Vec<Signature>,
13
14 /// Group ID
15 pub group_id: Option<Uuid>,
16}
17
18impl Group {
19 pub fn new() -> Self {
20 Self {
21 tasks: Vec::new(),
22 group_id: Some(Uuid::new_v4()),
23 }
24 }
25
26 pub fn add(mut self, task: &str, args: Vec<serde_json::Value>) -> Self {
27 self.tasks
28 .push(Signature::new(task.to_string()).with_args(args));
29 self
30 }
31
32 pub fn add_signature(mut self, signature: Signature) -> Self {
33 self.tasks.push(signature);
34 self
35 }
36
37 /// Apply the group by enqueuing all tasks to the broker
38 pub async fn apply<B: Broker>(self, broker: &B) -> Result<Uuid, CanvasError> {
39 if self.tasks.is_empty() {
40 return Err(CanvasError::Invalid("Group cannot be empty".to_string()));
41 }
42
43 let group_id = self.group_id.unwrap_or_else(Uuid::new_v4);
44
45 // Enqueue all tasks in parallel
46 for sig in self.tasks {
47 // Convert signature to SerializedTask
48 let args_json = serde_json::json!({
49 "args": sig.args,
50 "kwargs": sig.kwargs
51 });
52 let args_bytes = serde_json::to_vec(&args_json)
53 .map_err(|e| CanvasError::Serialization(e.to_string()))?;
54
55 let mut task = SerializedTask::new(sig.task.clone(), args_bytes);
56
57 // Set priority if specified
58 if let Some(priority) = sig.options.priority {
59 task = task.with_priority(priority.into());
60 }
61
62 // Set group_id in metadata (for tracking)
63 task.metadata.group_id = Some(group_id);
64
65 // Enqueue the task
66 broker
67 .enqueue(task)
68 .await
69 .map_err(|e| CanvasError::Broker(e.to_string()))?;
70 }
71
72 Ok(group_id)
73 }
74}
75
76impl Default for Group {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl Group {
83 /// Check if group is empty
84 pub fn is_empty(&self) -> bool {
85 self.tasks.is_empty()
86 }
87
88 /// Get the first task in the group
89 pub fn first(&self) -> Option<&Signature> {
90 self.tasks.first()
91 }
92
93 /// Get the last task in the group
94 pub fn last(&self) -> Option<&Signature> {
95 self.tasks.last()
96 }
97
98 /// Get an iterator over the tasks
99 pub fn iter(&self) -> std::slice::Iter<'_, Signature> {
100 self.tasks.iter()
101 }
102
103 /// Get a mutable iterator over the tasks
104 pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, Signature> {
105 self.tasks.iter_mut()
106 }
107
108 /// Get a task by index
109 pub fn get(&self, index: usize) -> Option<&Signature> {
110 self.tasks.get(index)
111 }
112
113 /// Get a mutable task by index
114 pub fn get_mut(&mut self, index: usize) -> Option<&mut Signature> {
115 self.tasks.get_mut(index)
116 }
117
118 /// Create a group with pre-allocated capacity
119 pub fn with_capacity(capacity: usize) -> Self {
120 Self {
121 tasks: Vec::with_capacity(capacity),
122 group_id: Some(Uuid::new_v4()),
123 }
124 }
125
126 /// Extend the group with additional tasks
127 pub fn extend(mut self, tasks: impl IntoIterator<Item = Signature>) -> Self {
128 self.tasks.extend(tasks);
129 self
130 }
131
132 /// Retain only tasks that satisfy the predicate
133 pub fn retain<F>(mut self, f: F) -> Self
134 where
135 F: FnMut(&Signature) -> bool,
136 {
137 self.tasks.retain(f);
138 self
139 }
140
141 /// Find a task by predicate
142 pub fn find<F>(&self, predicate: F) -> Option<&Signature>
143 where
144 F: Fn(&Signature) -> bool,
145 {
146 self.tasks.iter().find(|sig| predicate(sig))
147 }
148
149 /// Filter tasks by predicate and return a new group
150 pub fn filter<F>(mut self, predicate: F) -> Self
151 where
152 F: FnMut(&Signature) -> bool,
153 {
154 self.tasks.retain(predicate);
155 self
156 }
157
158 /// Stagger task execution with countdown delays
159 ///
160 /// Each task gets a countdown that increases by the specified interval.
161 /// This helps prevent thundering herd problems when launching many tasks.
162 ///
163 /// # Arguments
164 /// * `start` - Initial countdown in seconds for the first task
165 /// * `step` - Increment in seconds for each subsequent task
166 ///
167 /// # Example
168 /// ```
169 /// use celers_canvas::{Group, Signature};
170 ///
171 /// let group = Group::new()
172 /// .add("task1", vec![])
173 /// .add("task2", vec![])
174 /// .add("task3", vec![])
175 /// .skew(0.0, 1.0); // task1: 0s, task2: 1s, task3: 2s
176 ///
177 /// assert_eq!(group.tasks[0].options.countdown, Some(0));
178 /// assert_eq!(group.tasks[1].options.countdown, Some(1));
179 /// assert_eq!(group.tasks[2].options.countdown, Some(2));
180 /// ```
181 pub fn skew(mut self, start: f64, step: f64) -> Self {
182 let mut countdown = start;
183 for task in &mut self.tasks {
184 task.options.countdown = Some(countdown as u64);
185 countdown += step;
186 }
187 self
188 }
189
190 /// Stagger task execution with random jitter
191 ///
192 /// Each task gets a random countdown between 0 and max_delay.
193 /// This provides more even load distribution than linear skew.
194 ///
195 /// # Arguments
196 /// * `max_delay` - Maximum countdown in seconds
197 pub fn jitter(mut self, max_delay: u64) -> Self {
198 use std::collections::hash_map::DefaultHasher;
199 use std::hash::{Hash, Hasher};
200
201 for (i, task) in self.tasks.iter_mut().enumerate() {
202 // Use a deterministic "random" based on task index and name
203 let mut hasher = DefaultHasher::new();
204 i.hash(&mut hasher);
205 task.task.hash(&mut hasher);
206 let hash = hasher.finish();
207 let delay = hash % (max_delay + 1);
208 task.options.countdown = Some(delay);
209 }
210 self
211 }
212
213 /// Get number of tasks in group
214 pub fn len(&self) -> usize {
215 self.tasks.len()
216 }
217
218 /// Check if group ID is set
219 pub fn has_group_id(&self) -> bool {
220 self.group_id.is_some()
221 }
222
223 /// Merge another group into this group
224 ///
225 /// All tasks from the other group are added to this group.
226 ///
227 /// # Example
228 /// ```
229 /// use celers_canvas::Group;
230 ///
231 /// let group1 = Group::new()
232 /// .add("task1", vec![])
233 /// .add("task2", vec![]);
234 ///
235 /// let group2 = Group::new()
236 /// .add("task3", vec![])
237 /// .add("task4", vec![]);
238 ///
239 /// let merged = group1.merge(group2);
240 /// assert_eq!(merged.len(), 4);
241 /// ```
242 pub fn merge(mut self, other: Group) -> Self {
243 self.tasks.extend(other.tasks);
244 self
245 }
246
247 /// Partition tasks into multiple groups based on a predicate
248 ///
249 /// Returns a tuple of (matching, non_matching) groups.
250 ///
251 /// # Example
252 /// ```
253 /// use celers_canvas::{Group, Signature};
254 ///
255 /// let group = Group::new()
256 /// .add_signature(Signature::new("high_priority".to_string()).with_priority(9))
257 /// .add_signature(Signature::new("normal".to_string()).with_priority(5))
258 /// .add_signature(Signature::new("urgent".to_string()).with_priority(9))
259 /// .add_signature(Signature::new("low".to_string()).with_priority(1));
260 ///
261 /// let (high, low) = group.partition(|sig| sig.options.priority.unwrap_or(0) >= 9);
262 /// assert_eq!(high.len(), 2);
263 /// assert_eq!(low.len(), 2);
264 /// ```
265 pub fn partition<F>(self, mut predicate: F) -> (Group, Group)
266 where
267 F: FnMut(&Signature) -> bool,
268 {
269 let (matching, non_matching): (Vec<_>, Vec<_>) =
270 self.tasks.into_iter().partition(|sig| predicate(sig));
271
272 (
273 Group {
274 tasks: matching,
275 group_id: self.group_id,
276 },
277 Group {
278 tasks: non_matching,
279 group_id: None, // Different group
280 },
281 )
282 }
283
284 /// Add a task name prefix to all tasks in the group
285 ///
286 /// # Example
287 /// ```
288 /// use celers_canvas::Group;
289 ///
290 /// let group = Group::new()
291 /// .add("process", vec![])
292 /// .add("validate", vec![]);
293 ///
294 /// let prefixed = group.with_task_prefix("batch_");
295 /// assert_eq!(prefixed.first().unwrap().task, "batch_process");
296 /// ```
297 pub fn with_task_prefix(mut self, prefix: &str) -> Self {
298 for task in &mut self.tasks {
299 task.task = format!("{}{}", prefix, task.task);
300 }
301 self
302 }
303
304 /// Add a task name suffix to all tasks in the group
305 ///
306 /// # Example
307 /// ```
308 /// use celers_canvas::Group;
309 ///
310 /// let group = Group::new()
311 /// .add("process", vec![])
312 /// .add("validate", vec![]);
313 ///
314 /// let suffixed = group.with_task_suffix("_v2");
315 /// assert_eq!(suffixed.first().unwrap().task, "process_v2");
316 /// ```
317 pub fn with_task_suffix(mut self, suffix: &str) -> Self {
318 for task in &mut self.tasks {
319 task.task = format!("{}{}", task.task, suffix);
320 }
321 self
322 }
323
324 /// Set priority on all tasks in the group
325 ///
326 /// # Example
327 /// ```
328 /// use celers_canvas::Group;
329 ///
330 /// let group = Group::new()
331 /// .add("task1", vec![])
332 /// .add("task2", vec![])
333 /// .with_priority(9);
334 ///
335 /// assert_eq!(group.first().unwrap().options.priority, Some(9));
336 /// ```
337 pub fn with_priority(mut self, priority: u8) -> Self {
338 for task in &mut self.tasks {
339 task.options.priority = Some(priority);
340 }
341 self
342 }
343
344 /// Set queue on all tasks in the group
345 ///
346 /// # Example
347 /// ```
348 /// use celers_canvas::Group;
349 ///
350 /// let group = Group::new()
351 /// .add("task1", vec![])
352 /// .add("task2", vec![])
353 /// .with_queue("high_priority".to_string());
354 ///
355 /// assert_eq!(group.first().unwrap().options.queue, Some("high_priority".to_string()));
356 /// ```
357 pub fn with_queue(mut self, queue: String) -> Self {
358 for task in &mut self.tasks {
359 task.options.queue = Some(queue.clone());
360 }
361 self
362 }
363
364 /// Validate that all tasks in the group have non-empty names
365 ///
366 /// Returns true if all tasks are valid, false otherwise.
367 ///
368 /// # Example
369 /// ```
370 /// use celers_canvas::Group;
371 ///
372 /// let valid = Group::new()
373 /// .add("task1", vec![])
374 /// .add("task2", vec![]);
375 /// assert!(valid.is_valid());
376 ///
377 /// let invalid = Group { tasks: vec![], group_id: None };
378 /// assert!(!invalid.is_valid());
379 /// ```
380 pub fn is_valid(&self) -> bool {
381 !self.tasks.is_empty() && self.tasks.iter().all(|t| !t.task.is_empty())
382 }
383
384 /// Count tasks that match a predicate
385 ///
386 /// # Example
387 /// ```
388 /// use celers_canvas::{Group, Signature};
389 ///
390 /// let group = Group::new()
391 /// .add_signature(Signature::new("high".to_string()).with_priority(9))
392 /// .add_signature(Signature::new("low".to_string()).with_priority(1))
393 /// .add_signature(Signature::new("urgent".to_string()).with_priority(9));
394 ///
395 /// let high_priority = group.count_matching(|sig| sig.options.priority.unwrap_or(0) >= 9);
396 /// assert_eq!(high_priority, 2);
397 /// ```
398 pub fn count_matching<F>(&self, predicate: F) -> usize
399 where
400 F: Fn(&Signature) -> bool,
401 {
402 self.tasks.iter().filter(|t| predicate(t)).count()
403 }
404
405 /// Check if any task matches a predicate
406 ///
407 /// # Example
408 /// ```
409 /// use celers_canvas::Group;
410 ///
411 /// let group = Group::new()
412 /// .add("process", vec![])
413 /// .add("validate", vec![]);
414 ///
415 /// assert!(group.any(|sig| sig.task == "validate"));
416 /// assert!(!group.any(|sig| sig.task == "missing"));
417 /// ```
418 pub fn any<F>(&self, predicate: F) -> bool
419 where
420 F: Fn(&Signature) -> bool,
421 {
422 self.tasks.iter().any(predicate)
423 }
424
425 /// Check if all tasks match a predicate
426 ///
427 /// # Example
428 /// ```
429 /// use celers_canvas::Group;
430 ///
431 /// let group = Group::new()
432 /// .add("process", vec![])
433 /// .add("validate", vec![]);
434 ///
435 /// assert!(group.all(|sig| !sig.task.is_empty()));
436 /// ```
437 pub fn all<F>(&self, predicate: F) -> bool
438 where
439 F: Fn(&Signature) -> bool,
440 {
441 self.tasks.iter().all(predicate)
442 }
443
444 /// Map over all tasks, transforming each signature
445 ///
446 /// # Example
447 /// ```
448 /// use celers_canvas::{Group, Signature};
449 ///
450 /// let group = Group::new()
451 /// .add("task1", vec![])
452 /// .add("task2", vec![]);
453 ///
454 /// let modified = group.map_tasks(|sig| {
455 /// Signature::new(format!("parallel_{}", sig.task))
456 /// });
457 ///
458 /// assert_eq!(modified.first().unwrap().task, "parallel_task1");
459 /// ```
460 pub fn map_tasks<F>(mut self, f: F) -> Self
461 where
462 F: FnMut(Signature) -> Signature,
463 {
464 self.tasks = self.tasks.into_iter().map(f).collect();
465 self
466 }
467
468 /// Filter and map tasks in one operation
469 ///
470 /// # Example
471 /// ```
472 /// use celers_canvas::{Group, Signature};
473 ///
474 /// let group = Group::new()
475 /// .add_signature(Signature::new("high".to_string()).with_priority(9))
476 /// .add_signature(Signature::new("low".to_string()).with_priority(1))
477 /// .add_signature(Signature::new("urgent".to_string()).with_priority(9));
478 ///
479 /// let high_priority = group.filter_map(|sig| {
480 /// if sig.options.priority.unwrap_or(0) >= 9 {
481 /// Some(sig)
482 /// } else {
483 /// None
484 /// }
485 /// });
486 ///
487 /// assert_eq!(high_priority.len(), 2);
488 /// ```
489 pub fn filter_map<F>(mut self, f: F) -> Self
490 where
491 F: FnMut(Signature) -> Option<Signature>,
492 {
493 self.tasks = self.tasks.into_iter().filter_map(f).collect();
494 self
495 }
496
497 /// Take the first n tasks from the group
498 ///
499 /// # Example
500 /// ```
501 /// use celers_canvas::Group;
502 ///
503 /// let group = Group::new()
504 /// .add("task1", vec![])
505 /// .add("task2", vec![])
506 /// .add("task3", vec![])
507 /// .add("task4", vec![]);
508 ///
509 /// let first_two = group.take(2);
510 /// assert_eq!(first_two.len(), 2);
511 /// ```
512 pub fn take(mut self, n: usize) -> Self {
513 self.tasks.truncate(n);
514 self
515 }
516
517 /// Skip the first n tasks from the group
518 ///
519 /// # Example
520 /// ```
521 /// use celers_canvas::Group;
522 ///
523 /// let group = Group::new()
524 /// .add("task1", vec![])
525 /// .add("task2", vec![])
526 /// .add("task3", vec![])
527 /// .add("task4", vec![]);
528 ///
529 /// let skipped = group.skip(2);
530 /// assert_eq!(skipped.len(), 2);
531 /// assert_eq!(skipped.first().unwrap().task, "task3");
532 /// ```
533 pub fn skip(mut self, n: usize) -> Self {
534 self.tasks = self.tasks.into_iter().skip(n).collect();
535 self
536 }
537
538 /// Find the index of the first task with the given name
539 ///
540 /// # Example
541 /// ```
542 /// use celers_canvas::Group;
543 ///
544 /// let group = Group::new()
545 /// .add("task1", vec![])
546 /// .add("task2", vec![])
547 /// .add("task1", vec![]);
548 ///
549 /// assert_eq!(group.find_task("task1"), Some(0));
550 /// assert_eq!(group.find_task("task2"), Some(1));
551 /// assert_eq!(group.find_task("task3"), None);
552 /// ```
553 pub fn find_task(&self, task_name: &str) -> Option<usize> {
554 self.tasks.iter().position(|t| t.task == task_name)
555 }
556
557 /// Find all indices of tasks with the given name
558 ///
559 /// # Example
560 /// ```
561 /// use celers_canvas::Group;
562 ///
563 /// let group = Group::new()
564 /// .add("task1", vec![])
565 /// .add("task2", vec![])
566 /// .add("task1", vec![]);
567 ///
568 /// assert_eq!(group.find_all_tasks("task1"), vec![0, 2]);
569 /// assert_eq!(group.find_all_tasks("task2"), vec![1]);
570 /// ```
571 pub fn find_all_tasks(&self, task_name: &str) -> Vec<usize> {
572 self.tasks
573 .iter()
574 .enumerate()
575 .filter(|(_, t)| t.task == task_name)
576 .map(|(i, _)| i)
577 .collect()
578 }
579
580 /// Check if the group contains a task with the given name
581 ///
582 /// # Example
583 /// ```
584 /// use celers_canvas::Group;
585 ///
586 /// let group = Group::new()
587 /// .add("task1", vec![])
588 /// .add("task2", vec![]);
589 ///
590 /// assert!(group.contains_task("task1"));
591 /// assert!(!group.contains_task("task3"));
592 /// ```
593 pub fn contains_task(&self, task_name: &str) -> bool {
594 self.tasks.iter().any(|t| t.task == task_name)
595 }
596
597 /// Get the maximum estimated duration in seconds based on task time limits
598 ///
599 /// Since tasks run in parallel, the group duration is the maximum of all task durations.
600 /// Returns None if no tasks have time limits set.
601 ///
602 /// # Example
603 /// ```
604 /// use celers_canvas::{Group, Signature};
605 ///
606 /// let group = Group::new()
607 /// .add_signature(Signature::new("task1".to_string()).with_time_limit(10))
608 /// .add_signature(Signature::new("task2".to_string()).with_time_limit(20));
609 ///
610 /// assert_eq!(group.estimated_duration(), Some(20)); // Max of 10 and 20
611 /// ```
612 pub fn estimated_duration(&self) -> Option<u64> {
613 self.tasks
614 .iter()
615 .filter_map(|t| t.options.time_limit.or(t.options.soft_time_limit))
616 .max()
617 }
618
619 /// Get a summary of all task names in the group
620 ///
621 /// # Example
622 /// ```
623 /// use celers_canvas::Group;
624 ///
625 /// let group = Group::new()
626 /// .add("fetch", vec![])
627 /// .add("process", vec![])
628 /// .add("save", vec![]);
629 ///
630 /// assert_eq!(group.task_names(), vec!["fetch", "process", "save"]);
631 /// ```
632 pub fn task_names(&self) -> Vec<&str> {
633 self.tasks.iter().map(|t| t.task.as_str()).collect()
634 }
635
636 /// Get all unique task names in the group
637 ///
638 /// # Example
639 /// ```
640 /// use celers_canvas::Group;
641 ///
642 /// let group = Group::new()
643 /// .add("task1", vec![])
644 /// .add("task2", vec![])
645 /// .add("task1", vec![]);
646 ///
647 /// let unique = group.unique_task_names();
648 /// assert_eq!(unique.len(), 2);
649 /// assert!(unique.contains(&"task1"));
650 /// assert!(unique.contains(&"task2"));
651 /// ```
652 pub fn unique_task_names(&self) -> std::collections::HashSet<&str> {
653 self.tasks.iter().map(|t| t.task.as_str()).collect()
654 }
655
656 /// Clone the group with a transformation applied to each task
657 ///
658 /// # Example
659 /// ```
660 /// use celers_canvas::Group;
661 ///
662 /// let group = Group::new()
663 /// .add("task1", vec![])
664 /// .add("task2", vec![]);
665 ///
666 /// let prioritized = group.clone_with_transform(|sig| {
667 /// sig.clone().with_priority(5)
668 /// });
669 ///
670 /// assert!(prioritized.tasks.iter().all(|t| t.options.priority == Some(5)));
671 /// ```
672 pub fn clone_with_transform<F>(&self, mut transform: F) -> Self
673 where
674 F: FnMut(&Signature) -> Signature,
675 {
676 Self {
677 tasks: self.tasks.iter().map(&mut transform).collect(),
678 group_id: self.group_id,
679 }
680 }
681
682 /// Count tasks by priority level
683 ///
684 /// Returns a map of priority values to the count of tasks at that priority.
685 ///
686 /// # Example
687 /// ```
688 /// use celers_canvas::{Group, Signature};
689 ///
690 /// let group = Group::new()
691 /// .add_signature(Signature::new("task1".to_string()).with_priority(1))
692 /// .add_signature(Signature::new("task2".to_string()).with_priority(5))
693 /// .add_signature(Signature::new("task3".to_string()).with_priority(1));
694 ///
695 /// let counts = group.count_by_priority();
696 /// assert_eq!(counts.get(&1), Some(&2));
697 /// assert_eq!(counts.get(&5), Some(&1));
698 /// ```
699 pub fn count_by_priority(&self) -> std::collections::HashMap<u8, usize> {
700 let mut counts = std::collections::HashMap::new();
701 for task in &self.tasks {
702 if let Some(priority) = task.options.priority {
703 *counts.entry(priority).or_insert(0) += 1;
704 }
705 }
706 counts
707 }
708
709 /// Count tasks by queue name
710 ///
711 /// Returns a map of queue names to the count of tasks targeting that queue.
712 ///
713 /// # Example
714 /// ```
715 /// use celers_canvas::{Group, Signature};
716 ///
717 /// let group = Group::new()
718 /// .add_signature(Signature::new("task1".to_string()).with_queue("queue_a".to_string()))
719 /// .add_signature(Signature::new("task2".to_string()).with_queue("queue_b".to_string()))
720 /// .add_signature(Signature::new("task3".to_string()).with_queue("queue_a".to_string()));
721 ///
722 /// let counts = group.count_by_queue();
723 /// assert_eq!(counts.get("queue_a"), Some(&2));
724 /// assert_eq!(counts.get("queue_b"), Some(&1));
725 /// ```
726 pub fn count_by_queue(&self) -> std::collections::HashMap<String, usize> {
727 let mut counts = std::collections::HashMap::new();
728 for task in &self.tasks {
729 if let Some(ref queue) = task.options.queue {
730 *counts.entry(queue.clone()).or_insert(0) += 1;
731 }
732 }
733 counts
734 }
735}
736
737impl std::fmt::Display for Group {
738 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
739 write!(f, "Group[{} tasks]", self.tasks.len())?;
740 if let Some(group_id) = self.group_id {
741 write!(f, " id={}", &group_id.to_string()[..8])?;
742 }
743 Ok(())
744 }
745}
746
747impl IntoIterator for Group {
748 type Item = Signature;
749 type IntoIter = std::vec::IntoIter<Signature>;
750
751 fn into_iter(self) -> Self::IntoIter {
752 self.tasks.into_iter()
753 }
754}
755
756impl<'a> IntoIterator for &'a Group {
757 type Item = &'a Signature;
758 type IntoIter = std::slice::Iter<'a, Signature>;
759
760 fn into_iter(self) -> Self::IntoIter {
761 self.tasks.iter()
762 }
763}
764
765impl From<Vec<Signature>> for Group {
766 fn from(tasks: Vec<Signature>) -> Self {
767 Self {
768 tasks,
769 group_id: Some(Uuid::new_v4()),
770 }
771 }
772}
773
774impl FromIterator<Signature> for Group {
775 fn from_iter<T: IntoIterator<Item = Signature>>(iter: T) -> Self {
776 Self {
777 tasks: iter.into_iter().collect(),
778 group_id: Some(Uuid::new_v4()),
779 }
780 }
781}