shitpost_markov 1.0.1

Backend for training a Markov bot at compile-time.
Documentation
use std::{
	pin::Pin,
	sync::{Arc, Mutex},
	task::{Context, Poll},
};

use actson::{
	tokio::AsyncBufReaderJsonFeeder, JsonEvent, JsonParser,
};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use reqwest::{Error as RequestError, RequestBuilder};
use tokio::{
	io::{
		AsyncRead, BufReader, Error as IOError, ReadBuf,
		Result as IOResult,
	},
	sync::mpsc::UnboundedSender,
};

use crate::{
	tokens::{self, Tokens},
	Error,
};

#[derive(Debug, Default)]
struct ParseState {
	state: State,
	id_found: bool,
}

#[derive(Debug, Default)]
enum State {
	#[default]
	Start,
	Depth(u8),
	ID,
	Content,
}

struct AsyncStreamReader<T> {
	stream: T,
	overflow: Option<Bytes>,
}

impl<T> AsyncRead for AsyncStreamReader<T>
where
	T: Stream<Item = Result<Bytes, RequestError>> + Unpin,
{
	fn poll_read(
		self: Pin<&mut Self>,
		cx: &mut Context<'_>,
		buf: &mut ReadBuf<'_>,
	) -> Poll<IOResult<()>> {
		let self_ = self.get_mut();
		let data = match self_.overflow.take() {
			Some(data) if !data.is_empty() => data,
			_ => match self_.stream.poll_next_unpin(cx) {
				Poll::Pending => return Poll::Pending,
				Poll::Ready(None) => return Poll::Ready(Ok(())),
				Poll::Ready(Some(data)) => {
					data.map_err(IOError::other)?
				},
			},
		};

		let rem = buf.remaining();
		#[allow(clippy::cast_possible_wrap)]
		match (data.len() as isize).wrapping_sub_unsigned(rem) {
			0.. => {
				buf.put_slice(&data[..rem]);
				self_.overflow = Some(data.slice(rem..));
			},
			_ => {
				buf.put_slice(&data);
			},
		};

		Poll::Ready(Ok(()))
	}
}

pub async fn process_posts(
	rq: RequestBuilder,
	min_id: String,
	send: UnboundedSender<Result<Option<String>, Error>>,
	tokens: Arc<Mutex<Tokens>>,
) {
	let stream = match rq.query(&[("min_id", &min_id)]).send().await {
		Ok(s) => s.bytes_stream(),
		Err(e) => {
			send.send(Err(Error::RequestError(e)))
				.expect("Sending channel closed unexpectedly.");
			return;
		},
	};
	let stream = AsyncStreamReader {
		stream,
		overflow: None,
	};
	let reader = BufReader::new(stream);
	let feeder = AsyncBufReaderJsonFeeder::new(reader);
	let mut parser = JsonParser::new(feeder);

	let mut state = ParseState::default();

	let mut parse_result = parser.next_event();
	while let Ok(Some(event)) = parse_result {
		match (event, &mut state.state) {
			(JsonEvent::NeedMoreInput, _) => {
				if let Err(e) = parser.feeder.fill_buf().await {
					send.send(Err(e.into())).expect(
						"Sending channel closed unexpectedly.",
					);
				}
			},
			(JsonEvent::StartObject, State::Start) => {
				state.state = State::Depth(1)
			},
			(JsonEvent::StartObject, State::Depth(ref mut depth)) => {
				*depth += 1
			},
			(JsonEvent::EndObject, State::Depth(ref mut depth)) => {
				*depth -= 1
			},
			(JsonEvent::FieldName, State::Depth(1)) => {
				match parser.current_str() {
					Ok("content") => state.state = State::Content,
					Ok("id") if !state.id_found => {
						state.state = State::ID
					},
					_ => {},
				}
			},
			(JsonEvent::ValueString, State::ID) => {
				match parser.current_str() {
					Ok(id) => {
						send.send(Ok(Some(id.to_string()))).expect(
							"Sending channel closed unexpectedly.",
						);
						state = ParseState {
							id_found: true,
							state: State::Depth(1),
						};
					},
					Err(e) => send.send(Err(e.into())).expect(
						"Sending channel closed unexpectedly.",
					),
				}
			},
			(JsonEvent::ValueString, State::Content) => {
				match parser.current_str() {
					Ok(content) => {
						tokens::tokenise(content, &tokens);
						state.state = State::Depth(1);
					},
					Err(e) => send.send(Err(e.into())).expect(
						"Sending channel closed unexpectedly.",
					),
				}
			},
			_ => {},
		}
		parse_result = parser.next_event();
	}
	if let Err(e) = parse_result {
		send.send(Err(e.into()))
			.expect("Sending channel closed unexpectedly.");
	}

	if let State::Start = state.state {
		send.send(Ok(None))
			.expect("Sending channel closed unexpectedly.");
	}
}