scatter_net/scatter_net/fetch/implementations/
future.rs

1use std::{
2    future::Future,
3    task::Poll::{Pending, Ready},
4};
5
6use n0_future::FutureExt;
7use ps_datachunk::{DataChunk, OwnedDataChunk};
8use ps_hkey::AsyncStore;
9
10use crate::{Fetch, FetchError, FetchInnerWritable};
11
12impl Future for Fetch {
13    type Output = Result<OwnedDataChunk, FetchError>;
14
15    fn poll(
16        self: std::pin::Pin<&mut Self>,
17        cx: &mut std::task::Context<'_>,
18    ) -> std::task::Poll<Self::Output> {
19        let mut guard = self.write();
20
21        if let FetchInnerWritable::Done { chunk } = &*guard {
22            return Ready(Ok(chunk.clone()));
23        }
24
25        if let FetchInnerWritable::Initial { net } = &*guard {
26            // try getting chunk locally
27            if let Ok(chunk) = net.lake.get_encrypted_chunk(&self.hash) {
28                let chunk = chunk.into_owned();
29
30                *guard = FetchInnerWritable::Done {
31                    chunk: chunk.clone(),
32                };
33
34                return Ready(Ok(chunk));
35            }
36
37            let peer_groups = net.get_peer_groups().into();
38
39            *guard = FetchInnerWritable::Fetching {
40                net: net.clone(), // TODO optimize out this clone eventually
41                peer_groups,
42                promises: Vec::new(),
43            };
44        }
45
46        let FetchInnerWritable::Fetching {
47            net,
48            peer_groups,
49            promises,
50        } = &mut *guard
51        else {
52            unreachable!("This state is logically impossible, all variants have been exhausted.");
53        };
54
55        for mut promise in std::mem::take(promises) {
56            match promise.poll(cx) {
57                Pending => promises.push(promise),
58                Ready(Ok(chunk)) => {
59                    net.upsert_put(chunk.clone());
60
61                    // drop cached reference
62                    net.write().fetches.remove(&self.hash);
63
64                    *guard = FetchInnerWritable::Done {
65                        chunk: chunk.clone(),
66                    };
67
68                    drop(guard); // allow Future to be polled again
69
70                    return Ready(Ok(chunk));
71                }
72                Ready(Err(_)) => (),
73            }
74        }
75
76        if let Some(peer_group) = peer_groups.pop_front() {
77            let mut promise = peer_group.get(&self.hash);
78
79            // poll the Future synchronously (send request)
80            promise.ready(cx);
81
82            promises.push(promise);
83
84            return Pending;
85        }
86
87        if promises.is_empty() {
88            Ready(Err(FetchError::OptionsExhausted(self.hash.clone())))
89        } else {
90            Pending
91        }
92    }
93}