arpa_dal/
cache.rs

1use crate::error::{DataAccessResult, GroupError, NodeInfoError};
2use crate::{
3    BLSResultCacheState, BLSTasksHandler, BlockInfoHandler, ContextInfoUpdater, GroupInfoHandler,
4    NodeInfoHandler, SignatureResultCacheHandler,
5};
6
7use super::{
8    BLSTasksFetcher, BLSTasksUpdater, BlockInfoFetcher, BlockInfoUpdater, GroupInfoFetcher,
9    GroupInfoUpdater, NodeInfoFetcher, NodeInfoUpdater, ResultCache, SignatureResultCacheFetcher,
10    SignatureResultCacheUpdater,
11};
12use arpa_core::log::encoder;
13use arpa_core::{BLSTask, BLSTaskError, DKGStatus, DKGTask, Group, Member, RandomnessTask, Task};
14use async_trait::async_trait;
15use dkg_core::primitives::DKGOutput;
16use ethers_core::types::Address;
17use log::info;
18use std::collections::{BTreeMap, HashMap};
19use threshold_bls::group::{Curve, Element};
20use threshold_bls::serialize::point_to_hex;
21use threshold_bls::sig::Share;
22
23#[derive(Debug, Default)]
24pub struct InMemoryBlockInfoCache {
25    chain_id: usize,
26    block_height: usize,
27    block_time: usize,
28}
29
30impl InMemoryBlockInfoCache {
31    pub fn new(chain_id: usize, block_time: usize) -> Self {
32        InMemoryBlockInfoCache {
33            chain_id,
34            block_height: 0,
35            block_time,
36        }
37    }
38}
39
40impl BlockInfoHandler for InMemoryBlockInfoCache {}
41
42impl BlockInfoFetcher for InMemoryBlockInfoCache {
43    fn get_chain_id(&self) -> usize {
44        self.chain_id
45    }
46
47    fn get_block_height(&self) -> usize {
48        self.block_height
49    }
50
51    fn get_block_time(&self) -> usize {
52        self.block_time
53    }
54}
55
56impl BlockInfoUpdater for InMemoryBlockInfoCache {
57    fn set_block_height(&mut self, block_height: usize) {
58        self.block_height = block_height;
59    }
60}
61
62#[derive(Clone)]
63pub struct InMemoryNodeInfoCache<C: Curve> {
64    pub(crate) id_address: Address,
65    pub(crate) node_rpc_endpoint: Option<String>,
66    pub(crate) dkg_private_key: Option<C::Scalar>,
67    pub(crate) dkg_public_key: Option<C::Point>,
68}
69
70impl<C: Curve> std::fmt::Debug for InMemoryNodeInfoCache<C> {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("InMemoryNodeInfoCache")
73            .field("id_address", &self.id_address)
74            .field("node_rpc_endpoint", &self.node_rpc_endpoint)
75            .field("dkg_private_key", &"ignored")
76            .field(
77                "dkg_public_key",
78                &(self.dkg_public_key.as_ref()).map(point_to_hex),
79            )
80            .finish()
81    }
82}
83
84impl<C: Curve> InMemoryNodeInfoCache<C> {
85    pub fn new(id_address: Address) -> Self {
86        InMemoryNodeInfoCache {
87            id_address,
88            node_rpc_endpoint: None,
89            dkg_private_key: None,
90            dkg_public_key: None,
91        }
92    }
93
94    pub fn rebuild(
95        id_address: Address,
96        node_rpc_endpoint: String,
97        dkg_private_key: C::Scalar,
98        dkg_public_key: C::Point,
99    ) -> Self {
100        InMemoryNodeInfoCache {
101            id_address,
102            node_rpc_endpoint: Some(node_rpc_endpoint),
103            dkg_private_key: Some(dkg_private_key),
104            dkg_public_key: Some(dkg_public_key),
105        }
106    }
107}
108
109impl<C: Curve> ContextInfoUpdater for InMemoryNodeInfoCache<C> {
110    fn refresh_context_entry(&self) {
111        encoder::CONTEXT_INFO.write()[0] = format!("{:?}", &self);
112    }
113}
114
115#[async_trait]
116impl<C: Curve> NodeInfoUpdater<C> for InMemoryNodeInfoCache<C> {
117    async fn set_node_rpc_endpoint(&mut self, node_rpc_endpoint: String) -> DataAccessResult<()> {
118        self.node_rpc_endpoint = Some(node_rpc_endpoint);
119        self.refresh_context_entry();
120        Ok(())
121    }
122
123    async fn set_dkg_key_pair(
124        &mut self,
125        dkg_private_key: C::Scalar,
126        dkg_public_key: C::Point,
127    ) -> DataAccessResult<()> {
128        self.dkg_private_key = Some(dkg_private_key);
129        self.dkg_public_key = Some(dkg_public_key);
130        self.refresh_context_entry();
131        Ok(())
132    }
133}
134
135impl<C: Curve> NodeInfoFetcher<C> for InMemoryNodeInfoCache<C> {
136    fn get_id_address(&self) -> DataAccessResult<Address> {
137        Ok(self.id_address)
138    }
139
140    fn get_node_rpc_endpoint(&self) -> DataAccessResult<&str> {
141        self.node_rpc_endpoint
142            .as_ref()
143            .map(|e| e as &str)
144            .ok_or_else(|| NodeInfoError::NoRpcEndpoint.into())
145    }
146
147    fn get_dkg_private_key(&self) -> DataAccessResult<&C::Scalar> {
148        self.dkg_private_key
149            .as_ref()
150            .ok_or_else(|| NodeInfoError::NoDKGKeyPair.into())
151    }
152
153    fn get_dkg_public_key(&self) -> DataAccessResult<&C::Point> {
154        self.dkg_public_key
155            .as_ref()
156            .ok_or_else(|| NodeInfoError::NoDKGKeyPair.into())
157    }
158}
159
160#[derive(Clone)]
161pub struct InMemoryGroupInfoCache<C: Curve> {
162    pub(crate) share: Option<Share<C::Scalar>>,
163    pub(crate) group: Group<C>,
164    pub(crate) dkg_status: DKGStatus,
165    pub(crate) self_index: usize,
166    pub(crate) dkg_start_block_height: usize,
167}
168
169impl<C: Curve> Default for InMemoryGroupInfoCache<C> {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175impl<C: Curve> std::fmt::Debug for InMemoryGroupInfoCache<C> {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("InMemoryGroupInfoCache")
178            .field("share", &"ignored")
179            .field("group", &self.group)
180            .field("dkg_status", &self.dkg_status)
181            .field("self_index", &self.self_index)
182            .field("dkg_start_block_height", &self.dkg_start_block_height)
183            .finish()
184    }
185}
186
187impl<C: Curve> InMemoryGroupInfoCache<C> {
188    pub fn new() -> Self {
189        let group: Group<C> = Group::new();
190
191        InMemoryGroupInfoCache {
192            share: None,
193            group,
194            dkg_status: DKGStatus::None,
195            self_index: 0,
196            dkg_start_block_height: 0,
197        }
198    }
199
200    pub fn rebuild(
201        share: Option<Share<C::Scalar>>,
202        group: Group<C>,
203        dkg_status: DKGStatus,
204        self_index: usize,
205        dkg_start_block_height: usize,
206    ) -> Self {
207        InMemoryGroupInfoCache {
208            share,
209            group,
210            dkg_status,
211            self_index,
212            dkg_start_block_height,
213        }
214    }
215
216    fn only_has_group_task(&self) -> DataAccessResult<()> {
217        if self.dkg_start_block_height == 0 {
218            return Err(GroupError::NoGroupTask.into());
219        }
220
221        Ok(())
222    }
223}
224
225impl<C: Curve> ContextInfoUpdater for InMemoryGroupInfoCache<C> {
226    fn refresh_context_entry(&self) {
227        encoder::CONTEXT_INFO.write()[1] = format!("{:?}", &self);
228    }
229}
230
231#[async_trait]
232impl<C: Curve> GroupInfoUpdater<C> for InMemoryGroupInfoCache<C> {
233    async fn update_dkg_status(
234        &mut self,
235        index: usize,
236        epoch: usize,
237        dkg_status: DKGStatus,
238    ) -> DataAccessResult<bool> {
239        self.only_has_group_task()?;
240
241        if self.group.index != index {
242            return Err(GroupError::GroupIndexObsolete(self.group.index).into());
243        }
244
245        if self.group.epoch != epoch {
246            return Err(GroupError::GroupEpochObsolete(self.group.epoch).into());
247        }
248
249        if self.dkg_status == dkg_status {
250            return Ok(false);
251        }
252
253        info!(
254            "dkg_status transfered from {:?} to {:?}",
255            self.dkg_status, dkg_status
256        );
257
258        self.dkg_status = dkg_status;
259
260        self.refresh_context_entry();
261
262        Ok(true)
263    }
264
265    async fn save_task_info(&mut self, self_index: usize, task: DKGTask) -> DataAccessResult<()> {
266        self.self_index = self_index;
267
268        self.group.index = task.group_index;
269
270        self.group.epoch = task.epoch;
271
272        self.group.size = task.size;
273
274        self.group.threshold = task.threshold;
275
276        self.group.public_key = None;
277
278        self.group.state = false;
279
280        self.group.members.clear();
281
282        self.group.committers.clear();
283
284        self.dkg_start_block_height = task.assignment_block_height;
285
286        task.members
287            .iter()
288            .enumerate()
289            .for_each(|(index, address)| {
290                let member = Member {
291                    index,
292                    id_address: *address,
293                    rpc_endpoint: None,
294                    partial_public_key: None,
295                };
296                self.group.members.insert(*address, member);
297            });
298
299        self.refresh_context_entry();
300
301        Ok(())
302    }
303
304    async fn save_successful_output(
305        &mut self,
306        index: usize,
307        epoch: usize,
308        output: DKGOutput<C>,
309    ) -> DataAccessResult<(C::Point, C::Point, Vec<Address>)> {
310        self.only_has_group_task()?;
311
312        if self.group.index != index {
313            return Err(GroupError::GroupIndexObsolete(self.group.index).into());
314        }
315
316        if self.group.epoch != epoch {
317            return Err(GroupError::GroupEpochObsolete(self.group.epoch).into());
318        }
319
320        if self.group.state {
321            return Err(GroupError::GroupAlreadyReady.into());
322        }
323
324        // every member index is started from 0
325        let qualified_node_indices = output
326            .qual
327            .nodes
328            .iter()
329            .map(|node| node.id() as usize)
330            .collect::<Vec<_>>();
331
332        let disqualified_nodes = self
333            .group
334            .members
335            .iter()
336            .filter(|(_, member)| !qualified_node_indices.contains(&member.index))
337            .map(|(id_address, _)| *id_address)
338            .collect::<Vec<_>>();
339
340        let public_key = output.public.public_key().clone();
341
342        let mut partial_public_key = C::Point::new();
343
344        let share = bincode::deserialize(&bincode::serialize(&output.share)?)?;
345
346        self.share = Some(share);
347        self.group.size = qualified_node_indices.len();
348        self.group
349            .members
350            .retain(|node, _| !disqualified_nodes.contains(node));
351        self.group.public_key = Some(public_key.clone());
352
353        for (_, member) in self.group.members.iter_mut() {
354            if let Some(node) = output
355                .qual
356                .nodes
357                .iter()
358                .find(|node| member.index == node.id() as usize)
359            {
360                if let Some(rpc_endpoint) = node.get_rpc_endpoint() {
361                    member.rpc_endpoint = Some(rpc_endpoint.to_string());
362                }
363            }
364
365            let member_partial_public_key = bincode::deserialize(&bincode::serialize(
366                &output.public.eval(member.index as u32).value,
367            )?)?;
368            member.partial_public_key = Some(member_partial_public_key);
369
370            if self.self_index == member.index {
371                partial_public_key = member.partial_public_key.clone().unwrap();
372            }
373        }
374
375        self.refresh_context_entry();
376
377        Ok((public_key, partial_public_key, disqualified_nodes))
378    }
379
380    async fn save_failed_output(
381        &mut self,
382        index: usize,
383        epoch: usize,
384        disqualified_node_indices: Vec<u32>,
385    ) -> DataAccessResult<Vec<Address>> {
386        self.only_has_group_task()?;
387
388        if self.group.index != index {
389            return Err(GroupError::GroupIndexObsolete(self.group.index).into());
390        }
391
392        if self.group.epoch != epoch {
393            return Err(GroupError::GroupEpochObsolete(self.group.epoch).into());
394        }
395
396        if self.group.state {
397            return Err(GroupError::GroupAlreadyReady.into());
398        }
399
400        // remove disqualified nodes from members
401        let disqualified_nodes = self
402            .group
403            .members
404            .iter()
405            .filter(|(_, member)| disqualified_node_indices.contains(&(member.index as u32)))
406            .map(|(id_address, _)| *id_address)
407            .collect::<Vec<_>>();
408
409        self.group.remove_disqualified_nodes(&disqualified_nodes);
410
411        self.group.size = self.group.members.len();
412
413        self.refresh_context_entry();
414
415        Ok(disqualified_nodes)
416    }
417
418    async fn save_committers(
419        &mut self,
420        index: usize,
421        epoch: usize,
422        committer_indices: Vec<Address>,
423    ) -> DataAccessResult<()> {
424        self.only_has_group_task()?;
425
426        if self.group.index != index {
427            return Err(GroupError::GroupIndexObsolete(self.group.index).into());
428        }
429
430        if self.group.epoch != epoch {
431            return Err(GroupError::GroupEpochObsolete(self.group.epoch).into());
432        }
433
434        if self.group.state {
435            return Err(GroupError::GroupAlreadyReady.into());
436        }
437
438        self.group.committers = committer_indices;
439
440        self.group.state = true;
441
442        self.refresh_context_entry();
443
444        Ok(())
445    }
446
447    async fn sync_up_members(
448        &mut self,
449        index: usize,
450        epoch: usize,
451        members: BTreeMap<Address, Member<C>>,
452    ) -> DataAccessResult<bool> {
453        self.only_has_group_task()?;
454
455        if self.group.index != index {
456            return Err(GroupError::GroupIndexObsolete(self.group.index).into());
457        }
458
459        if self.group.epoch != epoch {
460            return Err(GroupError::GroupEpochObsolete(self.group.epoch).into());
461        }
462
463        if self.group.state {
464            return Err(GroupError::GroupAlreadyReady.into());
465        }
466
467        if self.group.members.len() != members.len() {
468            // update members with input but with original index
469            self.group
470                .members
471                .retain(|id_address, _| members.contains_key(id_address));
472
473            self.group.size = self.group.members.len();
474
475            self.refresh_context_entry();
476
477            return Ok(false);
478        }
479
480        Ok(true)
481    }
482}
483
484impl<C: Curve> GroupInfoFetcher<C> for InMemoryGroupInfoCache<C> {
485    fn get_group(&self) -> DataAccessResult<&Group<C>> {
486        self.only_has_group_task()?;
487
488        Ok(&self.group)
489    }
490
491    fn get_index(&self) -> DataAccessResult<usize> {
492        self.only_has_group_task()?;
493
494        Ok(self.group.index)
495    }
496
497    fn get_epoch(&self) -> DataAccessResult<usize> {
498        self.only_has_group_task()?;
499
500        Ok(self.group.epoch)
501    }
502
503    fn get_size(&self) -> DataAccessResult<usize> {
504        self.only_has_group_task()?;
505
506        Ok(self.group.size)
507    }
508
509    fn get_threshold(&self) -> DataAccessResult<usize> {
510        self.only_has_group_task()?;
511
512        Ok(self.group.threshold)
513    }
514
515    fn get_state(&self) -> DataAccessResult<bool> {
516        self.only_has_group_task()?;
517
518        Ok(self.group.state)
519    }
520
521    fn get_self_index(&self) -> DataAccessResult<usize> {
522        self.only_has_group_task()?;
523
524        Ok(self.self_index)
525    }
526
527    fn get_public_key(&self) -> DataAccessResult<&C::Point> {
528        self.only_has_group_task()?;
529
530        self.group
531            .public_key
532            .as_ref()
533            .ok_or(GroupError::GroupNotExisted)
534            .map_err(|e| e.into())
535    }
536
537    fn get_secret_share(&self) -> DataAccessResult<&Share<C::Scalar>> {
538        self.only_has_group_task()?;
539
540        self.share
541            .as_ref()
542            .ok_or(GroupError::GroupNotReady)
543            .map_err(|e| e.into())
544    }
545
546    fn get_members(&self) -> DataAccessResult<&BTreeMap<Address, Member<C>>> {
547        self.only_has_group_task()?;
548
549        Ok(&self.group.members)
550    }
551
552    fn get_member(&self, id_address: Address) -> DataAccessResult<&Member<C>> {
553        self.only_has_group_task()?;
554
555        self.group
556            .members
557            .get(&id_address)
558            .ok_or(GroupError::MemberNotExisted)
559            .map_err(|e| e.into())
560    }
561
562    fn get_committers(&self) -> DataAccessResult<Vec<Address>> {
563        self.only_has_group_task()?;
564
565        Ok(self.group.committers.clone())
566    }
567
568    fn get_dkg_start_block_height(&self) -> DataAccessResult<usize> {
569        self.only_has_group_task()?;
570
571        Ok(self.dkg_start_block_height)
572    }
573
574    fn get_dkg_status(&self) -> DataAccessResult<DKGStatus> {
575        self.only_has_group_task()?;
576
577        Ok(self.dkg_status)
578    }
579
580    fn is_committer(&self, id_address: Address) -> DataAccessResult<bool> {
581        self.only_has_group_task()?;
582
583        Ok(self.group.committers.contains(&id_address))
584    }
585}
586
587#[derive(Default, Debug, Clone)]
588pub struct InMemoryBLSTasksQueue<T: Task> {
589    bls_tasks: HashMap<Vec<u8>, BLSTask<T>>,
590}
591
592impl<T: Task> InMemoryBLSTasksQueue<T> {
593    pub fn new() -> Self {
594        InMemoryBLSTasksQueue {
595            bls_tasks: HashMap::new(),
596        }
597    }
598}
599
600#[async_trait]
601impl<T: Task + Sync + Clone> BLSTasksFetcher<T> for InMemoryBLSTasksQueue<T> {
602    async fn contains(&self, task_request_id: &[u8]) -> DataAccessResult<bool> {
603        Ok(self.bls_tasks.contains_key(task_request_id))
604    }
605
606    async fn get(&self, task_request_id: &[u8]) -> DataAccessResult<T> {
607        self.bls_tasks
608            .get(task_request_id)
609            .map(|task| task.task.clone())
610            .ok_or_else(|| BLSTaskError::TaskNotFound.into())
611    }
612
613    async fn is_handled(&self, task_request_id: &[u8]) -> DataAccessResult<bool> {
614        Ok(*self
615            .bls_tasks
616            .get(task_request_id)
617            .map(|task| &task.state)
618            .unwrap_or(&false))
619    }
620}
621
622#[async_trait]
623impl BLSTasksUpdater<RandomnessTask> for InMemoryBLSTasksQueue<RandomnessTask> {
624    async fn add(&mut self, task: RandomnessTask) -> DataAccessResult<()> {
625        self.bls_tasks
626            .insert(task.request_id().to_vec(), BLSTask { task, state: false });
627
628        Ok(())
629    }
630
631    async fn check_and_get_available_tasks(
632        &mut self,
633        current_block_height: usize,
634        current_group_index: usize,
635        randomness_task_exclusive_window: usize,
636    ) -> DataAccessResult<Vec<RandomnessTask>> {
637        let available_tasks = self
638            .bls_tasks
639            .iter_mut()
640            .filter(|(_, task)| !task.state)
641            .filter(|(_, task)| {
642                task.task.group_index == current_group_index as u32
643                    || current_block_height
644                        > task.task.assignment_block_height + randomness_task_exclusive_window
645            })
646            .map(|(_, task)| {
647                task.state = true;
648                task.task.clone()
649            })
650            .collect::<Vec<_>>();
651
652        Ok(available_tasks)
653    }
654}
655
656#[derive(Debug, Default, Clone)]
657pub struct InMemorySignatureResultCache<C: ResultCache> {
658    signature_result_caches: BTreeMap<Vec<u8>, BLSResultCache<C>>,
659}
660
661impl<C: ResultCache> InMemorySignatureResultCache<C> {
662    pub fn new() -> Self {
663        InMemorySignatureResultCache {
664            signature_result_caches: BTreeMap::new(),
665        }
666    }
667
668    pub fn rebuild(results: Vec<BLSResultCache<C>>) -> Self {
669        let mut cache = InMemorySignatureResultCache::new();
670
671        for result in results {
672            cache
673                .signature_result_caches
674                .insert(result.result_cache.request_id().to_vec(), result);
675        }
676
677        cache
678    }
679}
680
681impl Task for RandomnessResultCache {
682    fn request_id(&self) -> &[u8] {
683        &self.randomness_task.request_id
684    }
685}
686
687impl ResultCache for RandomnessResultCache {
688    type Task = RandomnessTask;
689    type M = Vec<u8>;
690}
691
692#[derive(Debug, Clone)]
693pub struct BLSResultCache<C: ResultCache> {
694    pub result_cache: C,
695    pub state: BLSResultCacheState,
696}
697
698#[derive(Clone, Debug)]
699pub struct RandomnessResultCache {
700    pub group_index: usize,
701    pub randomness_task: RandomnessTask,
702    pub message: Vec<u8>,
703    pub threshold: usize,
704    pub partial_signatures: BTreeMap<Address, Vec<u8>>,
705    pub committed_times: usize,
706}
707
708#[async_trait]
709impl<C: ResultCache + Send + Sync> SignatureResultCacheFetcher<C>
710    for InMemorySignatureResultCache<C>
711{
712    async fn contains(&self, task_request_id: &[u8]) -> DataAccessResult<bool> {
713        Ok(self.signature_result_caches.contains_key(task_request_id))
714    }
715
716    async fn get(&self, task_request_id: &[u8]) -> DataAccessResult<BLSResultCache<C>> {
717        self.signature_result_caches
718            .get(task_request_id)
719            .cloned()
720            .ok_or_else(|| BLSTaskError::CommitterCacheNotExisted.into())
721    }
722}
723
724#[async_trait]
725impl SignatureResultCacheUpdater<RandomnessResultCache>
726    for InMemorySignatureResultCache<RandomnessResultCache>
727{
728    async fn add(
729        &mut self,
730        group_index: usize,
731        task: RandomnessTask,
732        message: Vec<u8>,
733        threshold: usize,
734    ) -> DataAccessResult<bool> {
735        let signature_result_cache = RandomnessResultCache {
736            group_index,
737            randomness_task: task,
738            message,
739            threshold,
740            partial_signatures: BTreeMap::new(),
741            committed_times: 0,
742        };
743
744        if self
745            .signature_result_caches
746            .contains_key(&signature_result_cache.randomness_task.request_id)
747        {
748            return Ok(false);
749        }
750
751        self.signature_result_caches.insert(
752            signature_result_cache.randomness_task.request_id.clone(),
753            BLSResultCache {
754                result_cache: signature_result_cache,
755                state: BLSResultCacheState::NotCommitted,
756            },
757        );
758
759        Ok(true)
760    }
761
762    async fn add_partial_signature(
763        &mut self,
764        task_request_id: Vec<u8>,
765        member_address: Address,
766        partial_signature: Vec<u8>,
767    ) -> DataAccessResult<bool> {
768        let signature_result_cache = self
769            .signature_result_caches
770            .get_mut(&task_request_id)
771            .ok_or(BLSTaskError::CommitterCacheNotExisted)?;
772
773        if signature_result_cache
774            .result_cache
775            .partial_signatures
776            .contains_key(&member_address)
777        {
778            return Ok(false);
779        }
780
781        signature_result_cache
782            .result_cache
783            .partial_signatures
784            .insert(member_address, partial_signature);
785
786        Ok(true)
787    }
788
789    async fn get_ready_to_commit_signatures(
790        &mut self,
791        current_block_height: usize,
792    ) -> DataAccessResult<Vec<RandomnessResultCache>> {
793        let ready_to_commit_signatures = self
794            .signature_result_caches
795            .values_mut()
796            .filter(|v| {
797                ((current_block_height + 1)
798                    >= v.result_cache.randomness_task.assignment_block_height
799                        + v.result_cache.randomness_task.request_confirmations as usize)
800                    && v.state == BLSResultCacheState::NotCommitted
801                    && v.result_cache.partial_signatures.len() >= v.result_cache.threshold
802            })
803            .map(|v| {
804                v.state = BLSResultCacheState::Committing;
805                v.result_cache.clone()
806            })
807            .collect::<Vec<_>>();
808
809        Ok(ready_to_commit_signatures)
810    }
811
812    async fn update_commit_result(
813        &mut self,
814        task_request_id: &[u8],
815        status: BLSResultCacheState,
816    ) -> DataAccessResult<()> {
817        let signature_result_cache = self
818            .signature_result_caches
819            .get_mut(task_request_id)
820            .ok_or(BLSTaskError::CommitterCacheNotExisted)?;
821
822        signature_result_cache.state = status;
823
824        Ok(())
825    }
826
827    async fn incr_committed_times(&mut self, task_request_id: &[u8]) -> DataAccessResult<()> {
828        let signature_result_cache = self
829            .signature_result_caches
830            .get_mut(task_request_id)
831            .ok_or(BLSTaskError::CommitterCacheNotExisted)?;
832
833        signature_result_cache.result_cache.committed_times += 1;
834
835        Ok(())
836    }
837}
838
839impl<PC: Curve + 'static> NodeInfoHandler<PC> for InMemoryNodeInfoCache<PC> {}
840impl<PC: Curve + 'static> GroupInfoHandler<PC> for InMemoryGroupInfoCache<PC> {}
841impl BLSTasksHandler<RandomnessTask> for InMemoryBLSTasksQueue<RandomnessTask> {}
842impl SignatureResultCacheHandler<RandomnessResultCache>
843    for InMemorySignatureResultCache<RandomnessResultCache>
844{
845}