scatter_net/scatter_net/interaction/implementations/
async_store.rs

1use std::sync::Arc;
2
3use n0_future::StreamExt;
4use ps_datachunk::{DataChunk, OwnedDataChunk};
5use ps_hash::Hash;
6use ps_hkey::{AsyncStore, PsHkeyError};
7use ps_promise::{Promise, PromiseRejection};
8
9use crate::{Interaction, InteractionReadPacketError, InteractionSendPacketError, Packet};
10
11impl AsyncStore for Interaction {
12    type Chunk = OwnedDataChunk;
13    type Error = InteractionAsyncStoreError;
14
15    fn get(&self, hash: &Hash) -> Promise<Self::Chunk, Self::Error> {
16        let hash = hash.to_string();
17        let mut interaction = self.clone();
18
19        Promise::new(async move {
20            interaction
21                .send_packet(crate::Packet::FetchRequest(crate::FetchRequest {
22                    hash: hash.to_string(),
23                }))
24                .await?;
25
26            let packet = match interaction.next().await {
27                None => Err(InteractionAsyncStoreError::NoResponse)?,
28                Some(Ok(packet)) => packet,
29                Some(Err(err)) => Err(err)?,
30            };
31
32            if let Packet::FetchResponse(response) = packet {
33                match response {
34                    crate::FetchResponse::Error(err) => {
35                        Err(InteractionAsyncStoreError::ReturnedError(err))?
36                    }
37                    crate::FetchResponse::NotFound => Err(InteractionAsyncStoreError::NotFound)?,
38                    crate::FetchResponse::Success(bytes) => Ok(OwnedDataChunk::from_data(bytes)?),
39                    crate::FetchResponse::Suggest(node_id) => {
40                        // TODO connect to node
41                        let _ = interaction.get_peer().net().connect_to(node_id, None).await;
42
43                        Err(InteractionAsyncStoreError::NotFound)
44                    }
45                }
46            } else {
47                Err(InteractionAsyncStoreError::InvalidResponse)
48            }
49        })
50    }
51
52    fn put_encrypted<C: DataChunk>(&self, chunk: C) -> Promise<(), Self::Error> {
53        let chunk = chunk.into_owned();
54        let hash = chunk.hash();
55        let mut interaction = self.clone();
56
57        Promise::new(async move {
58            interaction
59                .send_packet(crate::Packet::PutRequest(crate::PutRequest {
60                    data: chunk.into_bytes(),
61                }))
62                .await?;
63
64            let packet = match interaction.next().await {
65                None => return Err(InteractionAsyncStoreError::NoResponse)?,
66                Some(Ok(packet)) => packet,
67                Some(Err(err)) => Err(err)?,
68            };
69
70            if let Packet::PutResponse(response) = packet {
71                match response {
72                    crate::PutResponse::Failure => Err(InteractionAsyncStoreError::PutFailed),
73                    crate::PutResponse::LimitExceeded => {
74                        Err(InteractionAsyncStoreError::LimitExceeded)
75                    }
76                    crate::PutResponse::Success(hkey) => {
77                        if hkey.as_bytes() == hash.as_bytes() {
78                            Ok(())
79                        } else {
80                            Err(InteractionAsyncStoreError::HkeyMismatch {
81                                expected: hash,
82                                received: hkey,
83                            })
84                        }
85                    }
86                }
87            } else {
88                Err(InteractionAsyncStoreError::InvalidResponse)
89            }
90        })
91    }
92}
93
94#[derive(thiserror::Error, Debug)]
95pub enum InteractionAsyncStoreError {
96    #[error(transparent)]
97    DataChunk(#[from] ps_datachunk::PsDataChunkError),
98    #[error(transparent)]
99    Hkey(#[from] PsHkeyError),
100    #[error("The Peer returned an unexpected Hkey.")]
101    HkeyMismatch {
102        expected: Arc<Hash>,
103        received: String,
104    },
105    #[error("Invalid response")]
106    InvalidResponse,
107    #[error("Exceeded usage limits")]
108    LimitExceeded,
109    #[error("Stream ended without response.")]
110    NoResponse,
111    #[error("Chunk not found")]
112    NotFound,
113    #[error("Promise already consumed.")]
114    PromiseAlreadyConsumed,
115    #[error("Peer responded to PutRequest with Failure.")]
116    PutFailed,
117    #[error(transparent)]
118    ReadPacket(#[from] InteractionReadPacketError),
119    #[error("Peer returned an Error.")]
120    ReturnedError(String),
121    #[error(transparent)]
122    SendPacket(#[from] InteractionSendPacketError),
123}
124
125impl PromiseRejection for InteractionAsyncStoreError {
126    fn already_consumed() -> Self {
127        Self::PromiseAlreadyConsumed
128    }
129}