scatter_net/scatter_net/interaction/methods/
read_packet.rs1use std::{
2 array::TryFromSliceError,
3 num::TryFromIntError,
4 task::{
5 Context,
6 Poll::{Pending, Ready},
7 },
8};
9
10use iroh::endpoint::ReadError;
11use n0_future::FutureExt;
12use ps_buffer::BufferError;
13
14use crate::{Interaction, Packet, PacketFromNetBufferError};
15
16impl Interaction {
17 pub fn read_packet(&self, cx: &mut Context<'_>) -> InteractionReadPacketResult {
18 let mut guard = self.write();
19
20 let mut recv_stream = match Box::pin(self.recv_stream.lock()).poll(cx) {
21 Pending => return Err(InteractionReadPacketError::RecvStreamLocked),
22 Ready(recv_stream) => recv_stream,
23 };
24
25 loop {
26 let mut current_offset = guard.buffer.len();
27
28 let capacity = {
29 if current_offset >= 4 {
30 let capacity_bytes = &guard.buffer[0..4];
31 let capacity = u32::from_be_bytes(capacity_bytes.try_into()?);
32
33 usize::try_from(capacity)?
34 } else {
35 guard.buffer.capacity().max(0x1000)
36 }
37 };
38
39 guard.buffer.set_len(capacity)?;
40
41 let result = recv_stream.poll_read(cx, &mut guard.buffer[current_offset..]);
42
43 match result {
44 Pending => {
45 guard.buffer.truncate(current_offset);
46
47 drop(recv_stream);
48
49 return guard.packets.pop_front().map_or_else(
50 || Ok(InteractionReadPacket::Waiting),
51 |packet| Ok(InteractionReadPacket::Packet(packet)),
52 );
53 }
54
55 Ready(Err(err)) => {
56 guard.buffer.truncate(current_offset);
57
58 drop(recv_stream);
59
60 return match guard.packets.pop_front() {
61 Some(packet) => Ok(InteractionReadPacket::Packet(packet)),
62 None => Err(err)?,
63 };
64 }
65
66 Ready(Ok(bytes_read)) => {
67 current_offset += bytes_read;
68 guard.buffer.truncate(current_offset);
69
70 while let Some((size, packet)) = Packet::from_net_buffer(&guard.buffer)? {
71 guard.packets.push_back(packet);
72 guard.buffer.copy_within(size..current_offset, 0);
73 current_offset -= size;
74 guard.buffer.truncate(current_offset);
75 }
76
77 if bytes_read == 0 {
78 return guard.packets.pop_front().map_or_else(
79 || Ok(InteractionReadPacket::EOF),
80 |packet| Ok(InteractionReadPacket::Packet(packet)),
81 );
82 }
83 }
84 }
85 }
86 }
87}
88
89#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
90pub enum InteractionReadPacket {
91 EOF,
92 Packet(Packet),
93 Waiting,
94}
95
96#[derive(thiserror::Error, Debug)]
97pub enum InteractionReadPacketError {
98 #[error(transparent)]
99 Buffer(#[from] BufferError),
100 #[error(transparent)]
101 Parse(#[from] PacketFromNetBufferError),
102 #[error(transparent)]
103 Read(#[from] ReadError),
104 #[error("The mutex on this interaction's recv_stream was locked.")]
105 RecvStreamLocked,
106 #[error(transparent)]
107 TryFromInt(#[from] TryFromIntError),
108 #[error(transparent)]
109 TryFromSlice(#[from] TryFromSliceError),
110}
111
112pub type InteractionReadPacketResult = Result<InteractionReadPacket, InteractionReadPacketError>;