scatter_net/scatter_net/interaction/implementations/
async_store.rs1use std::sync::Arc;
2
3use n0_future::StreamExt;
4use ps_datachunk::{DataChunk, OwnedDataChunk};
5use ps_hash::Hash;
6use ps_hkey::{AsyncStore, PsHkeyError};
7use ps_promise::{Promise, PromiseRejection};
8
9use crate::{Interaction, InteractionReadPacketError, InteractionSendPacketError, Packet};
10
11impl AsyncStore for Interaction {
12 type Chunk = OwnedDataChunk;
13 type Error = InteractionAsyncStoreError;
14
15 fn get(&self, hash: &Hash) -> Promise<Self::Chunk, Self::Error> {
16 let hash = hash.to_string();
17 let mut interaction = self.clone();
18
19 Promise::new(async move {
20 interaction
21 .send_packet(crate::Packet::FetchRequest(crate::FetchRequest {
22 hash: hash.to_string(),
23 }))
24 .await?;
25
26 let packet = match interaction.next().await {
27 None => Err(InteractionAsyncStoreError::NoResponse)?,
28 Some(Ok(packet)) => packet,
29 Some(Err(err)) => Err(err)?,
30 };
31
32 if let Packet::FetchResponse(response) = packet {
33 match response {
34 crate::FetchResponse::Error(err) => {
35 Err(InteractionAsyncStoreError::ReturnedError(err))?
36 }
37 crate::FetchResponse::NotFound => Err(InteractionAsyncStoreError::NotFound)?,
38 crate::FetchResponse::Success(bytes) => Ok(OwnedDataChunk::from_data(bytes)?),
39 crate::FetchResponse::Suggest(node_id) => {
40 let _ = interaction.get_peer().net().connect_to(node_id, None).await;
42
43 Err(InteractionAsyncStoreError::NotFound)
44 }
45 }
46 } else {
47 Err(InteractionAsyncStoreError::InvalidResponse)
48 }
49 })
50 }
51
52 fn put_encrypted<C: DataChunk>(&self, chunk: C) -> Promise<(), Self::Error> {
53 let chunk = chunk.into_owned();
54 let hash = chunk.hash();
55 let mut interaction = self.clone();
56
57 Promise::new(async move {
58 interaction
59 .send_packet(crate::Packet::PutRequest(crate::PutRequest {
60 data: chunk.into_bytes(),
61 }))
62 .await?;
63
64 let packet = match interaction.next().await {
65 None => return Err(InteractionAsyncStoreError::NoResponse)?,
66 Some(Ok(packet)) => packet,
67 Some(Err(err)) => Err(err)?,
68 };
69
70 if let Packet::PutResponse(response) = packet {
71 match response {
72 crate::PutResponse::Failure => Err(InteractionAsyncStoreError::PutFailed),
73 crate::PutResponse::LimitExceeded => {
74 Err(InteractionAsyncStoreError::LimitExceeded)
75 }
76 crate::PutResponse::Success(hkey) => {
77 if hkey.as_bytes() == hash.as_bytes() {
78 Ok(())
79 } else {
80 Err(InteractionAsyncStoreError::HkeyMismatch {
81 expected: hash,
82 received: hkey,
83 })
84 }
85 }
86 }
87 } else {
88 Err(InteractionAsyncStoreError::InvalidResponse)
89 }
90 })
91 }
92}
93
94#[derive(thiserror::Error, Debug)]
95pub enum InteractionAsyncStoreError {
96 #[error(transparent)]
97 DataChunk(#[from] ps_datachunk::PsDataChunkError),
98 #[error(transparent)]
99 Hkey(#[from] PsHkeyError),
100 #[error("The Peer returned an unexpected Hkey.")]
101 HkeyMismatch {
102 expected: Arc<Hash>,
103 received: String,
104 },
105 #[error("Invalid response")]
106 InvalidResponse,
107 #[error("Exceeded usage limits")]
108 LimitExceeded,
109 #[error("Stream ended without response.")]
110 NoResponse,
111 #[error("Chunk not found")]
112 NotFound,
113 #[error("Promise already consumed.")]
114 PromiseAlreadyConsumed,
115 #[error("Peer responded to PutRequest with Failure.")]
116 PutFailed,
117 #[error(transparent)]
118 ReadPacket(#[from] InteractionReadPacketError),
119 #[error("Peer returned an Error.")]
120 ReturnedError(String),
121 #[error(transparent)]
122 SendPacket(#[from] InteractionSendPacketError),
123}
124
125impl PromiseRejection for InteractionAsyncStoreError {
126 fn already_consumed() -> Self {
127 Self::PromiseAlreadyConsumed
128 }
129}