companies_house_api/streaming/
stream_connection.rs1use bytes::BytesMut;
2use std::{str::from_utf8, time::Duration};
3use tokio::time::timeout;
4
5use crate::streaming::operation::StreamItem;
6
7use super::{error::CompaniesHouseStreamingNextError, operation::CompaniesHouseStreamingOperation};
8
9pub struct StreamConnection<O: CompaniesHouseStreamingOperation> {
10 pub(super) buffer: BytesMut,
11 pub(super) response: reqwest::Response,
12 pub(super) max_chunk_timeout: Duration,
13 pub(super) _operation: O,
14}
15
16impl<O: CompaniesHouseStreamingOperation> StreamConnection<O> {
17 pub async fn next(&mut self) -> Result<StreamItem<O::Data>, CompaniesHouseStreamingNextError> {
18 loop {
19 if let Some((index, _)) = self.buffer.iter().enumerate().find(|(_, c)| **c == b'\n') {
20 let left = self.buffer.split_to(index + 1);
21 let str = from_utf8(&left)
22 .map_err(CompaniesHouseStreamingNextError::BadItemEncoding)?
23 .trim();
24 if str.is_empty() {
25 log::trace!("Buffer contains empty line");
26 } else {
27 log::trace!(length = str.len(); "Buffer contains next stream item");
28
29 let value_err = match serde_json::from_str(str) {
30 Ok(data) => return Ok(data),
31 Err(err) => err,
32 };
33
34 if let Ok(value) = serde_json::from_str(str) {
35 return Err(CompaniesHouseStreamingNextError::BadItemData {
36 inner: value_err,
37 value,
38 });
39 };
40
41 return Err(CompaniesHouseStreamingNextError::BadItemJson {
42 inner: value_err,
43 text: str.to_owned(),
44 });
45 }
46 }
47
48 log::trace!("Buffer contains no items, reading next response chunk");
49
50 let Some(chunk) = timeout(self.max_chunk_timeout, self.response.chunk())
51 .await
52 .map_err(|_| {
53 log::info!("Chunk timeout exceeded");
54 CompaniesHouseStreamingNextError::ChunkTimeout
55 })?
56 .map_err(CompaniesHouseStreamingNextError::BadChunk)?
57 else {
58 return Err(CompaniesHouseStreamingNextError::StreamComplete);
59 };
60
61 log::trace!(bytes = chunk.len(); "Stream chunk received");
62
63 if self.buffer.is_empty() && chunk.len() == 1 && chunk[0] == b'\n' {
64 log::info!("Heartbeat chunk received");
65 } else {
66 self.buffer.extend_from_slice(&chunk);
67 }
68 }
69 }
70}