scatter_net/scatter_net/fetch/implementations/
future.rs1use std::{
2 future::Future,
3 task::Poll::{Pending, Ready},
4};
5
6use n0_future::FutureExt;
7use ps_datachunk::{DataChunk, OwnedDataChunk};
8use ps_hkey::AsyncStore;
9
10use crate::{Fetch, FetchError, FetchInnerWritable};
11
12impl Future for Fetch {
13 type Output = Result<OwnedDataChunk, FetchError>;
14
15 fn poll(
16 self: std::pin::Pin<&mut Self>,
17 cx: &mut std::task::Context<'_>,
18 ) -> std::task::Poll<Self::Output> {
19 let mut guard = self.write();
20
21 if let FetchInnerWritable::Done { chunk } = &*guard {
22 return Ready(Ok(chunk.clone()));
23 }
24
25 if let FetchInnerWritable::Initial { net } = &*guard {
26 if let Ok(chunk) = net.lake.get_encrypted_chunk(&self.hash) {
28 let chunk = chunk.into_owned();
29
30 *guard = FetchInnerWritable::Done {
31 chunk: chunk.clone(),
32 };
33
34 return Ready(Ok(chunk));
35 }
36
37 let peer_groups = net.get_peer_groups().into();
38
39 *guard = FetchInnerWritable::Fetching {
40 net: net.clone(), peer_groups,
42 promises: Vec::new(),
43 };
44 }
45
46 let FetchInnerWritable::Fetching {
47 net,
48 peer_groups,
49 promises,
50 } = &mut *guard
51 else {
52 unreachable!("This state is logically impossible, all variants have been exhausted.");
53 };
54
55 for mut promise in std::mem::take(promises) {
56 match promise.poll(cx) {
57 Pending => promises.push(promise),
58 Ready(Ok(chunk)) => {
59 net.upsert_put(chunk.clone());
60
61 net.write().fetches.remove(&self.hash);
63
64 *guard = FetchInnerWritable::Done {
65 chunk: chunk.clone(),
66 };
67
68 drop(guard); return Ready(Ok(chunk));
71 }
72 Ready(Err(_)) => (),
73 }
74 }
75
76 if let Some(peer_group) = peer_groups.pop_front() {
77 let mut promise = peer_group.get(&self.hash);
78
79 promise.ready(cx);
81
82 promises.push(promise);
83
84 return Pending;
85 }
86
87 if promises.is_empty() {
88 Ready(Err(FetchError::OptionsExhausted(self.hash.clone())))
89 } else {
90 Pending
91 }
92 }
93}