use std::{
collections::VecDeque,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
};
use bloomfilter::Bloom;
use tokio::sync::Mutex;
use crate::{
error::KumoError,
request::{CrawlRequest, FrontierRequest, StoredFrontierRequest},
};
use super::Frontier;
const DEFAULT_FLUSH_EVERY: usize = 100;
const BLOOM_CAPACITY: usize = 1_000_000;
pub struct FileFrontier {
queue: Mutex<VecDeque<FrontierRequest>>,
seen_bloom: Mutex<Bloom<String>>,
seen_exact: Mutex<Vec<String>>,
dir: PathBuf,
flush_every: usize,
push_count: AtomicUsize,
}
impl std::fmt::Debug for FileFrontier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileFrontier")
.field("dir", &self.dir)
.field("flush_every", &self.flush_every)
.finish()
}
}
impl FileFrontier {
pub fn open(dir: impl Into<PathBuf>) -> Result<Self, KumoError> {
let dir = dir.into();
std::fs::create_dir_all(&dir).map_err(|e| KumoError::store("create frontier dir", e))?;
let queue_path = dir.join("queue.json");
let seen_path = dir.join("seen.json");
let mut bloom =
Bloom::new_for_fp_rate(BLOOM_CAPACITY, 0.001).expect("valid bloom filter parameters");
let seen_exact: Vec<String> = if seen_path.exists() {
let data = std::fs::read_to_string(&seen_path)
.map_err(|e| KumoError::store("read seen.json", e))?;
let urls: Vec<String> =
serde_json::from_str(&data).map_err(|e| KumoError::store("parse seen.json", e))?;
for url in &urls {
bloom.set(url);
}
urls
} else {
Vec::new()
};
let queue: VecDeque<FrontierRequest> = if queue_path.exists() {
let data = std::fs::read_to_string(&queue_path)
.map_err(|e| KumoError::store("read queue.json", e))?;
let stored: VecDeque<StoredFrontierRequest> =
serde_json::from_str(&data).map_err(|e| KumoError::store("parse queue.json", e))?;
stored
.into_iter()
.map(FrontierRequest::try_from)
.collect::<Result<_, _>>()
.map_err(KumoError::store_msg)?
} else {
VecDeque::new()
};
Ok(Self {
queue: Mutex::new(queue),
seen_bloom: Mutex::new(bloom),
seen_exact: Mutex::new(seen_exact),
dir,
flush_every: DEFAULT_FLUSH_EVERY,
push_count: AtomicUsize::new(0),
})
}
pub fn flush_every(mut self, n: usize) -> Self {
self.flush_every = n;
self
}
async fn flush_to_disk(&self) -> Result<(), KumoError> {
let queue = self.queue.lock().await;
let seen = self.seen_exact.lock().await;
let stored_queue: VecDeque<StoredFrontierRequest> =
queue.iter().map(StoredFrontierRequest::from).collect();
let queue_json = serde_json::to_string(&stored_queue)
.map_err(|e| KumoError::store("serialize queue", e))?;
let seen_json =
serde_json::to_string(&*seen).map_err(|e| KumoError::store("serialize seen", e))?;
std::fs::write(self.dir.join("queue.json"), queue_json)
.map_err(|e| KumoError::store("write queue.json", e))?;
std::fs::write(self.dir.join("seen.json"), seen_json)
.map_err(|e| KumoError::store("write seen.json", e))?;
Ok(())
}
pub async fn flush(&self) -> Result<(), KumoError> {
self.flush_to_disk().await
}
}
#[async_trait::async_trait]
impl Frontier for FileFrontier {
async fn push(&self, url: String, depth: usize) -> bool {
self.push_request(CrawlRequest::get(url), depth).await
}
async fn push_force(&self, url: String, depth: usize, retry_count: u32) {
self.push_request_force(FrontierRequest::new(
CrawlRequest::get(url),
depth,
retry_count,
))
.await;
}
async fn pop(&self) -> Option<(String, usize, u32)> {
self.pop_request().await.map(|queued| {
(
queued.request().url().to_string(),
queued.depth(),
queued.retry_count(),
)
})
}
async fn push_request(&self, request: CrawlRequest, depth: usize) -> bool {
let url = request.dedup_key().to_string();
if !request.dont_filter_enabled() {
let maybe_seen = {
let mut bloom = self.seen_bloom.lock().await;
let maybe_seen = bloom.check(&url);
if !maybe_seen {
bloom.set(&url);
}
maybe_seen
};
let mut seen = self.seen_exact.lock().await;
if maybe_seen && seen.iter().any(|seen_url| seen_url == &url) {
return false;
}
seen.push(url);
}
self.queue
.lock()
.await
.push_back(FrontierRequest::new(request, depth, 0));
let count = self.push_count.fetch_add(1, Ordering::Relaxed) + 1;
if count.is_multiple_of(self.flush_every) {
self.flush_to_disk().await.ok();
}
true
}
async fn push_request_force(&self, queued: FrontierRequest) {
self.queue.lock().await.push_back(queued);
let count = self.push_count.fetch_add(1, Ordering::Relaxed) + 1;
if count.is_multiple_of(self.flush_every) {
self.flush_to_disk().await.ok();
}
}
async fn pop_request(&self) -> Option<FrontierRequest> {
self.queue.lock().await.pop_front()
}
async fn len(&self) -> usize {
self.queue.lock().await.len()
}
async fn flush(&self) -> Result<(), KumoError> {
self.flush_to_disk().await
}
}