companies_house_api/streaming/
stream_connection.rs

1use bytes::BytesMut;
2use std::{str::from_utf8, time::Duration};
3use tokio::time::timeout;
4
5use crate::streaming::operation::StreamItem;
6
7use super::{error::CompaniesHouseStreamingNextError, operation::CompaniesHouseStreamingOperation};
8
9pub struct StreamConnection<O: CompaniesHouseStreamingOperation> {
10    pub(super) buffer: BytesMut,
11    pub(super) response: reqwest::Response,
12    pub(super) max_chunk_timeout: Duration,
13    pub(super) _operation: O,
14}
15
16impl<O: CompaniesHouseStreamingOperation> StreamConnection<O> {
17    pub async fn next(&mut self) -> Result<StreamItem<O::Data>, CompaniesHouseStreamingNextError> {
18        loop {
19            if let Some((index, _)) = self.buffer.iter().enumerate().find(|(_, c)| **c == b'\n') {
20                let left = self.buffer.split_to(index + 1);
21                let str = from_utf8(&left)
22                    .map_err(CompaniesHouseStreamingNextError::BadItemEncoding)?
23                    .trim();
24                if str.is_empty() {
25                    log::trace!("Buffer contains empty line");
26                } else {
27                    log::trace!(length = str.len(); "Buffer contains next stream item");
28
29                    let value_err = match serde_json::from_str(str) {
30                        Ok(data) => return Ok(data),
31                        Err(err) => err,
32                    };
33
34                    if let Ok(value) = serde_json::from_str(str) {
35                        return Err(CompaniesHouseStreamingNextError::BadItemData {
36                            inner: value_err,
37                            value,
38                        });
39                    };
40
41                    return Err(CompaniesHouseStreamingNextError::BadItemJson {
42                        inner: value_err,
43                        text: str.to_owned(),
44                    });
45                }
46            }
47
48            log::trace!("Buffer contains no items, reading next response chunk");
49
50            let Some(chunk) = timeout(self.max_chunk_timeout, self.response.chunk())
51                .await
52                .map_err(|_| {
53                    log::info!("Chunk timeout exceeded");
54                    CompaniesHouseStreamingNextError::ChunkTimeout
55                })?
56                .map_err(CompaniesHouseStreamingNextError::BadChunk)?
57            else {
58                return Err(CompaniesHouseStreamingNextError::StreamComplete);
59            };
60
61            log::trace!(bytes = chunk.len(); "Stream chunk received");
62
63            if self.buffer.is_empty() && chunk.len() == 1 && chunk[0] == b'\n' {
64                log::info!("Heartbeat chunk received");
65            } else {
66                self.buffer.extend_from_slice(&chunk);
67            }
68        }
69    }
70}