use std::marker::PhantomData;
use countio::Counter;
use quick_xml::{events, Reader};
use url::Url;
use crate::{
attribute as attr,
parse::XmlParser,
record::{EntryRecord, IndexRecord},
Error,
};
pub(crate) enum SitemapDetector<R> {
Entry(XmlParser<R, EntryRecord>),
Index(XmlParser<R, IndexRecord>),
}
impl<R> SitemapDetector<R> {
fn is_index(event: events::Event) -> Result<Option<bool>, Error> {
if let events::Event::Start(bytes) = event {
let name = bytes.name().into_inner();
if name.eq_ignore_ascii_case(attr::SITEMAP_INDEX.as_bytes()) {
return Ok(Some(true));
} else if name.eq_ignore_ascii_case(attr::URL_SET.as_bytes()) {
return Ok(Some(false));
}
}
Ok(None)
}
fn create(is_index: bool, reader: Reader<Counter<R>>) -> Self {
if is_index {
Self::Index(XmlParser::from_wrapper(reader, attr::SITEMAP_INDEX))
} else {
Self::Entry(XmlParser::from_wrapper(reader, attr::URL_SET))
}
}
}
impl<R: std::io::BufRead> SitemapDetector<R> {
pub fn from_sync(reader: R) -> Result<Self, Error> {
let mut reader = Reader::from_reader(Counter::new(reader));
let mut buf = Vec::new();
loop {
let event = reader.read_event_into(&mut buf)?;
if let Some(is_index) = Self::is_index(event)? {
return Ok(Self::create(is_index, reader));
}
}
}
}
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
impl<R: tokio::io::AsyncBufRead + Unpin + Send> SitemapDetector<R> {
pub async fn from_async(reader: R) -> Result<Self, Error> {
let mut reader = Reader::from_reader(Counter::new(reader));
let mut buf = Vec::new();
loop {
let event = reader.read_event_into_async(&mut buf).await?;
if let Some(is_index) = Self::is_index(event)? {
return Ok(Self::create(is_index, reader));
}
}
}
}
pub struct AutoParser<R, E, O, F: Fn(Url) -> O> {
sitemaps: Vec<Url>,
index: Option<XmlParser<R, IndexRecord>>,
entry: Option<XmlParser<R, EntryRecord>>,
error_type: PhantomData<E>,
fetcher: F,
}
impl<R, E: std::error::Error + From<Error>, O, F: Fn(Url) -> O> AutoParser<R, E, O, F> {
fn new(sitemaps: &[Url], fetcher: F) -> Self {
Self {
sitemaps: sitemaps.to_vec(),
index: None,
entry: None,
error_type: PhantomData,
fetcher,
}
}
fn save_parser(&mut self, detector: SitemapDetector<R>) {
match detector {
SitemapDetector::Entry(parser) => self.entry = Some(parser),
SitemapDetector::Index(parser) => self.index = Some(parser),
}
}
}
impl<R: std::io::BufRead, E: std::error::Error + From<Error>, F: Fn(Url) -> Result<R, E>>
AutoParser<R, E, Result<R, E>, F>
{
pub fn new_sync(sitemaps: &[Url], fetcher: F) -> Self {
Self::new(sitemaps, fetcher)
}
pub fn read_sync(&mut self) -> Result<Option<EntryRecord>, E> {
while !self.sitemaps.is_empty() || self.index.is_some() || self.entry.is_some() {
if self.index.is_none() && self.entry.is_none() {
let sitemap = self.sitemaps.pop().expect("not empty");
let reader = (self.fetcher)(sitemap)?;
let detector = SitemapDetector::from_sync(reader)?;
self.save_parser(detector);
}
if let Some(parser) = &mut self.entry {
use crate::parse::Parser;
let r = parser.read();
if let Ok(Some(record)) = r {
return Ok(Some(record));
}
self.entry.take();
if let Err(e) = r {
return Err(e.into());
}
}
if let Some(parser) = &mut self.index {
use crate::parse::Parser;
let record = parser.read();
if let Ok(Some(r)) = &record {
let reader = (self.fetcher)(r.location().clone())?;
match SitemapDetector::from_sync(reader)? {
SitemapDetector::Index(_) => { }
sitemap => self.save_parser(sitemap),
}
}
self.index.take();
if let Err(e) = record {
return Err(e.into());
}
}
}
Ok(None)
}
}
impl<R: std::io::BufRead, E: std::error::Error + From<Error>, F: Fn(Url) -> Result<R, E>> Iterator
for AutoParser<R, E, Result<R, E>, F>
{
type Item = EntryRecord;
fn next(&mut self) -> Option<Self::Item> {
self.read_sync().ok().flatten()
}
}
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
impl<
R: tokio::io::AsyncBufRead + Unpin + Send,
E: std::error::Error + From<Error>,
A: std::future::Future<Output = Result<R, E>>,
F: Fn(Url) -> A,
> AutoParser<R, E, A, F>
{
pub fn new_async(sitemaps: &[Url], fetcher: F) -> Self {
Self::new(sitemaps, fetcher)
}
pub async fn read_async(&mut self) -> Result<Option<EntryRecord>, E> {
while !self.sitemaps.is_empty() || self.index.is_some() || self.entry.is_some() {
if self.index.is_none() && self.entry.is_none() {
let sitemap = self.sitemaps.pop().expect("not empty");
let reader = (self.fetcher)(sitemap).await?;
let detector = SitemapDetector::from_async(reader).await?;
self.save_parser(detector);
}
if let Some(parser) = &mut self.entry {
use crate::parse::AsyncParser;
let r = parser.read().await;
if let Ok(Some(record)) = r {
return Ok(Some(record));
}
self.entry.take();
if let Err(e) = r {
return Err(e.into());
}
}
if let Some(parser) = &mut self.index {
use crate::parse::AsyncParser;
let record = parser.read().await;
if let Ok(Some(r)) = &record {
let reader = (self.fetcher)(r.location().clone()).await?;
match SitemapDetector::from_async(reader).await? {
SitemapDetector::Index(_) => { }
sitemap => self.save_parser(sitemap),
}
}
self.index.take();
if let Err(e) = record {
return Err(e.into());
}
}
}
Ok(None)
}
}
impl<R, E: std::error::Error + From<Error>, O, F: Fn(Url) -> O> std::fmt::Debug
for AutoParser<R, E, O, F>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sitemap")
.field("pool", &self.sitemaps.len())
.field("index", &self.index.is_some())
.field("entry", &self.entry.is_some())
.finish()
}
}
#[cfg(test)]
mod test {
use super::*;
#[derive(Debug, thiserror::Error)]
enum CustomError {
#[error("sitemap error: {0}")]
Sitemap(#[from] Error),
}
#[test]
fn synk() -> Result<(), Error> {
type SyncReader = std::io::BufReader<std::io::Cursor<Vec<u8>>>;
fn sync_fetcher(_: Url) -> Result<SyncReader, CustomError> {
unreachable!()
}
let _ = AutoParser::new_sync(&[], sync_fetcher);
Ok(())
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn asynk() -> Result<(), Error> {
type AsyncReader = tokio::io::BufReader<std::io::Cursor<Vec<u8>>>;
async fn async_fetcher(_: Url) -> Result<AsyncReader, test::CustomError> {
unreachable!()
}
let _ = AutoParser::new_async(&[], async_fetcher);
Ok(())
}
}