scatter_net/scatter_net/interaction/methods/
read_packet.rs

1use std::{
2    array::TryFromSliceError,
3    num::TryFromIntError,
4    task::{
5        Context,
6        Poll::{Pending, Ready},
7    },
8};
9
10use iroh::endpoint::ReadError;
11use n0_future::FutureExt;
12use ps_buffer::BufferError;
13
14use crate::{Interaction, Packet, PacketFromNetBufferError};
15
16impl Interaction {
17    pub fn read_packet(&self, cx: &mut Context<'_>) -> InteractionReadPacketResult {
18        let mut guard = self.write();
19
20        let mut recv_stream = match Box::pin(self.recv_stream.lock()).poll(cx) {
21            Pending => return Err(InteractionReadPacketError::RecvStreamLocked),
22            Ready(recv_stream) => recv_stream,
23        };
24
25        loop {
26            let mut current_offset = guard.buffer.len();
27
28            let capacity = {
29                if current_offset >= 4 {
30                    let capacity_bytes = &guard.buffer[0..4];
31                    let capacity = u32::from_be_bytes(capacity_bytes.try_into()?);
32
33                    usize::try_from(capacity)?
34                } else {
35                    guard.buffer.capacity().max(0x1000)
36                }
37            };
38
39            guard.buffer.set_len(capacity)?;
40
41            let result = recv_stream.poll_read(cx, &mut guard.buffer[current_offset..]);
42
43            match result {
44                Pending => {
45                    guard.buffer.truncate(current_offset);
46
47                    drop(recv_stream);
48
49                    return guard.packets.pop_front().map_or_else(
50                        || Ok(InteractionReadPacket::Waiting),
51                        |packet| Ok(InteractionReadPacket::Packet(packet)),
52                    );
53                }
54
55                Ready(Err(err)) => {
56                    guard.buffer.truncate(current_offset);
57
58                    drop(recv_stream);
59
60                    return match guard.packets.pop_front() {
61                        Some(packet) => Ok(InteractionReadPacket::Packet(packet)),
62                        None => Err(err)?,
63                    };
64                }
65
66                Ready(Ok(bytes_read)) => {
67                    current_offset += bytes_read;
68                    guard.buffer.truncate(current_offset);
69
70                    while let Some((size, packet)) = Packet::from_net_buffer(&guard.buffer)? {
71                        guard.packets.push_back(packet);
72                        guard.buffer.copy_within(size..current_offset, 0);
73                        current_offset -= size;
74                        guard.buffer.truncate(current_offset);
75                    }
76
77                    if bytes_read == 0 {
78                        return guard.packets.pop_front().map_or_else(
79                            || Ok(InteractionReadPacket::EOF),
80                            |packet| Ok(InteractionReadPacket::Packet(packet)),
81                        );
82                    }
83                }
84            }
85        }
86    }
87}
88
89#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
90pub enum InteractionReadPacket {
91    EOF,
92    Packet(Packet),
93    Waiting,
94}
95
96#[derive(thiserror::Error, Debug)]
97pub enum InteractionReadPacketError {
98    #[error(transparent)]
99    Buffer(#[from] BufferError),
100    #[error(transparent)]
101    Parse(#[from] PacketFromNetBufferError),
102    #[error(transparent)]
103    Read(#[from] ReadError),
104    #[error("The mutex on this interaction's recv_stream was locked.")]
105    RecvStreamLocked,
106    #[error(transparent)]
107    TryFromInt(#[from] TryFromIntError),
108    #[error(transparent)]
109    TryFromSlice(#[from] TryFromSliceError),
110}
111
112pub type InteractionReadPacketResult = Result<InteractionReadPacket, InteractionReadPacketError>;