#![cfg(test)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, TimeZone, Utc};
use crate::sources::{BackfillCheckpoint, Companions, FetchPage, SourceConnector, SourceDocument};
pub fn make_source_doc(id: &str, partition_key: &str) -> SourceDocument {
let mut fields = HashMap::new();
fields.insert("title".into(), format!("Doc {id}").into());
SourceDocument {
id: id.to_string(),
partition_key: partition_key.to_string(),
fields,
updated_at: Utc.with_ymd_and_hms(2024, 6, 1, 10, 0, 0).single().unwrap(),
source_link: format!("https://example.com/{id}"),
}
}
pub struct WindowPage {
pub documents: Vec<SourceDocument>,
pub next_page_token: Option<String>,
}
pub struct BackfillPage {
pub documents: Vec<SourceDocument>,
pub last_seen: Option<BackfillCheckpoint>,
}
pub enum ListIdsResponse {
Ok(Vec<String>),
Err(String),
}
#[derive(Clone)]
pub struct MockConnector {
pub source_name: String,
pub primary_container: String,
window_pages: Arc<Mutex<Vec<Result<WindowPage, String>>>>,
backfill_pages: Arc<Mutex<Vec<Result<BackfillPage, String>>>>,
list_ids: Arc<Mutex<Option<ListIdsResponse>>>,
companions: Arc<Mutex<Option<Companions>>>,
}
impl MockConnector {
pub fn new(source_name: &str, primary_container: &str) -> Self {
Self {
source_name: source_name.to_string(),
primary_container: primary_container.to_string(),
window_pages: Arc::new(Mutex::new(Vec::new())),
backfill_pages: Arc::new(Mutex::new(Vec::new())),
list_ids: Arc::new(Mutex::new(None)),
companions: Arc::new(Mutex::new(None)),
}
}
pub fn push_window_page(&self, docs: Vec<SourceDocument>, next_page_token: Option<String>) {
self.window_pages.lock().unwrap().push(Ok(WindowPage {
documents: docs,
next_page_token,
}));
}
pub fn push_window_error(&self, msg: impl Into<String>) {
self.window_pages.lock().unwrap().push(Err(msg.into()));
}
pub fn push_backfill_page(
&self,
docs: Vec<SourceDocument>,
last_seen: Option<BackfillCheckpoint>,
) {
self.backfill_pages.lock().unwrap().push(Ok(BackfillPage {
documents: docs,
last_seen,
}));
}
pub fn push_backfill_error(&self, msg: impl Into<String>) {
self.backfill_pages.lock().unwrap().push(Err(msg.into()));
}
pub fn set_list_ids(&self, ids: Vec<String>) {
*self.list_ids.lock().unwrap() = Some(ListIdsResponse::Ok(ids));
}
pub fn set_list_ids_error(&self, msg: impl Into<String>) {
*self.list_ids.lock().unwrap() = Some(ListIdsResponse::Err(msg.into()));
}
pub fn set_companions(&self, companions: Companions) {
*self.companions.lock().unwrap() = Some(companions);
}
}
impl SourceConnector for MockConnector {
fn source_type(&self) -> &str {
"mock"
}
fn source_name(&self) -> &str {
&self.source_name
}
fn subsources(&self) -> &[String] {
&[]
}
fn primary_container(&self) -> &str {
&self.primary_container
}
async fn fetch_window(
&self,
_subsource: &str,
_window_start: DateTime<Utc>,
_window_end: DateTime<Utc>,
_batch_size: usize,
_page_token: Option<&str>,
) -> anyhow::Result<FetchPage> {
let mut pages = self.window_pages.lock().unwrap();
match pages.first() {
None => {
Ok(FetchPage {
documents: vec![],
next_page_token: None,
last_seen: None,
})
}
Some(Ok(_)) => {
let page = match pages.remove(0) {
Ok(p) => p,
Err(_) => unreachable!(),
};
Ok(FetchPage {
documents: page.documents,
next_page_token: page.next_page_token,
last_seen: None,
})
}
Some(Err(_)) => {
let err = match pages.remove(0) {
Err(e) => e,
Ok(_) => unreachable!(),
};
Err(anyhow::anyhow!("{err}"))
}
}
}
async fn fetch_backfill_page(
&self,
_subsource: &str,
_backfill_target: DateTime<Utc>,
_last_seen: Option<&BackfillCheckpoint>,
_batch_size: usize,
) -> anyhow::Result<FetchPage> {
let mut pages = self.backfill_pages.lock().unwrap();
match pages.first() {
None => Ok(FetchPage {
documents: vec![],
next_page_token: None,
last_seen: None,
}),
Some(Ok(_)) => {
let page = match pages.remove(0) {
Ok(p) => p,
Err(_) => unreachable!(),
};
Ok(FetchPage {
documents: page.documents,
next_page_token: None,
last_seen: page.last_seen,
})
}
Some(Err(_)) => {
let err = match pages.remove(0) {
Err(e) => e,
Ok(_) => unreachable!(),
};
Err(anyhow::anyhow!("{err}"))
}
}
}
async fn list_all_ids(&self, _subsource: &str) -> anyhow::Result<Vec<String>> {
match self.list_ids.lock().unwrap().as_ref() {
Some(ListIdsResponse::Ok(ids)) => Ok(ids.clone()),
Some(ListIdsResponse::Err(e)) => Err(anyhow::anyhow!("{e}")),
None => Ok(vec![]),
}
}
async fn fetch_companions(&self, _subsource: &str) -> anyhow::Result<Companions> {
Ok(self.companions.lock().unwrap().take().unwrap_or_default())
}
}