use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
task::{Context, Poll},
};
use crate::{
error::KumoError,
logging::{event, target},
spider::Spider,
};
use super::builder::CrawlEngine;
pub(super) struct ChannelStore {
tx: tokio::sync::mpsc::Sender<serde_json::Value>,
cancelled: Arc<AtomicBool>,
}
impl ChannelStore {
pub(super) fn new(
tx: tokio::sync::mpsc::Sender<serde_json::Value>,
cancelled: Arc<AtomicBool>,
) -> Self {
Self { tx, cancelled }
}
}
#[async_trait::async_trait]
impl crate::store::ItemStore for ChannelStore {
async fn store(&self, item: &serde_json::Value) -> Result<(), KumoError> {
if self.tx.send(item.clone()).await.is_err() {
self.cancelled.store(true, Ordering::Relaxed);
}
Ok(())
}
}
impl CrawlEngine {
pub async fn stream<S>(self, spider: S) -> Result<ItemStream, KumoError>
where
S: Spider + 'static,
{
let buffer = self.stream_buffer;
let (tx, rx) = tokio::sync::mpsc::channel(buffer);
let cancelled = Arc::new(AtomicBool::new(false));
let mut engine = self.store(ChannelStore::new(tx, cancelled.clone()));
engine.stream_cancelled = Some(cancelled);
tokio::spawn(async move {
if let Err(e) = engine.run(spider).await {
tracing::error!(
target: target::CRAWL,
event = event::CRAWL_STREAM_ERROR,
error = %e,
error_kind = e.kind().as_str(),
"crawl.stream_error"
);
}
});
Ok(ItemStream::new(rx))
}
}
pub struct ItemStream {
inner: tokio_stream::wrappers::ReceiverStream<serde_json::Value>,
}
impl ItemStream {
pub(super) fn new(rx: tokio::sync::mpsc::Receiver<serde_json::Value>) -> Self {
Self {
inner: tokio_stream::wrappers::ReceiverStream::new(rx),
}
}
}
impl tokio_stream::Stream for ItemStream {
type Item = serde_json::Value;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.inner).poll_next(cx)
}
}