type EpisodeMap = tokio::sync::RwLock<HashMap<Uuid, Arc<Episode>>>;
use crate::Result;
use crate::episode::Episode;
use crate::pattern::Pattern;
use chrono::{TimeZone, Utc};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info};
use uuid::Uuid;
mod tag_queries;
pub use tag_queries::{TagStats, get_all_tags, get_tag_statistics, list_episodes_by_tags};
const GET_ALL_EPISODES_LIMIT: Option<usize> = Some(1000);
pub async fn get_all_episodes(
episodes_fallback: &tokio::sync::RwLock<HashMap<Uuid, Arc<Episode>>>,
cache_storage: Option<&Arc<dyn crate::StorageBackend>>,
turso_storage: Option<&Arc<dyn crate::StorageBackend>>,
) -> Result<Vec<Episode>> {
let mut all_episodes: HashMap<Uuid, Arc<Episode>> = {
let episodes = episodes_fallback.read().await;
episodes
.iter()
.map(|(id, ep)| (*id, Arc::clone(ep)))
.collect()
};
if let Some(cache) = cache_storage {
let since = Utc
.timestamp_millis_opt(0)
.single()
.unwrap_or_else(Utc::now);
match cache
.query_episodes_since(since, GET_ALL_EPISODES_LIMIT)
.await
{
Ok(cache_episodes) => {
debug!(
cache_count = cache_episodes.len(),
"Fetched episodes from cache storage"
);
for episode in cache_episodes {
all_episodes
.entry(episode.episode_id)
.or_insert_with(|| Arc::new(episode));
}
}
Err(e) => {
debug!("Failed to fetch episodes from cache storage: {}", e);
}
}
}
if let Some(turso) = turso_storage {
let since = Utc
.timestamp_millis_opt(0)
.single()
.unwrap_or_else(Utc::now);
match turso
.query_episodes_since(since, GET_ALL_EPISODES_LIMIT)
.await
{
Ok(turso_episodes) => {
debug!(
turso_count = turso_episodes.len(),
"Fetched episodes from durable storage"
);
for episode in turso_episodes {
all_episodes
.entry(episode.episode_id)
.or_insert_with(|| Arc::new(episode));
}
}
Err(e) => {
debug!("Failed to fetch episodes from durable storage: {}", e);
}
}
}
{
let mut episodes_cache = episodes_fallback.write().await;
for (id, episode) in &all_episodes {
if !episodes_cache.contains_key(id) {
episodes_cache.insert(*id, Arc::clone(episode));
}
}
}
let total_count = all_episodes.len();
info!(
total_episodes = total_count,
"Retrieved all episodes from all storage backends"
);
Ok(all_episodes
.into_values()
.map(|arc_ep| (*arc_ep).clone())
.collect())
}
pub async fn get_all_patterns(
patterns_fallback: &tokio::sync::RwLock<HashMap<crate::episode::PatternId, Pattern>>,
) -> Result<Vec<Pattern>> {
let patterns = patterns_fallback.read().await;
Ok(patterns.values().cloned().collect())
}
pub async fn list_episodes(
episodes_fallback: &tokio::sync::RwLock<HashMap<Uuid, Arc<Episode>>>,
cache_storage: Option<&Arc<dyn crate::StorageBackend>>,
turso_storage: Option<&Arc<dyn crate::StorageBackend>>,
limit: Option<usize>,
offset: Option<usize>,
completed_only: Option<bool>,
) -> Result<Vec<Episode>> {
let mut all_episodes =
get_all_episodes(episodes_fallback, cache_storage, turso_storage).await?;
if let Some(true) = completed_only {
all_episodes.retain(|e| e.is_complete());
}
all_episodes.sort_by_key(|b| std::cmp::Reverse(b.start_time));
let offset = offset.unwrap_or(0);
if offset > 0 {
all_episodes.drain(0..offset.min(all_episodes.len()));
}
if let Some(limit) = limit {
all_episodes.truncate(limit);
}
info!(
total_returned = all_episodes.len(),
"Listed episodes with filters"
);
Ok(all_episodes)
}
pub async fn list_episodes_filtered(
episodes_fallback: &EpisodeMap,
cache_storage: Option<&Arc<dyn crate::StorageBackend>>,
turso_storage: Option<&Arc<dyn crate::StorageBackend>>,
filter: super::filters::EpisodeFilter,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<Episode>> {
let all_episodes = get_all_episodes(episodes_fallback, cache_storage, turso_storage).await?;
let mut filtered = filter.apply(all_episodes);
filtered.sort_by_key(|b| std::cmp::Reverse(b.start_time));
let offset = offset.unwrap_or(0);
if offset > 0 {
filtered.drain(0..offset.min(filtered.len()));
}
if let Some(limit) = limit {
filtered.truncate(limit);
}
info!(
total_returned = filtered.len(),
"Listed episodes with advanced filters"
);
Ok(filtered)
}
#[allow(clippy::unused_async)]
pub async fn get_episode_patterns(
_episode_id: Uuid,
_patterns_fallback: &tokio::sync::RwLock<HashMap<crate::episode::PatternId, Pattern>>,
) -> Result<Vec<Pattern>> {
Ok(vec![])
}
pub fn has_turso_storage(turso_storage: &Option<Arc<dyn crate::StorageBackend>>) -> bool {
turso_storage.is_some()
}
pub fn has_cache_storage(cache_storage: &Option<Arc<dyn crate::StorageBackend>>) -> bool {
cache_storage.is_some()
}
pub fn turso_storage(
turso_storage: &Option<Arc<dyn crate::StorageBackend>>,
) -> Option<&Arc<dyn crate::StorageBackend>> {
turso_storage.as_ref()
}
pub fn cache_storage(
cache_storage: &Option<Arc<dyn crate::StorageBackend>>,
) -> Option<&Arc<dyn crate::StorageBackend>> {
cache_storage.as_ref()
}
pub async fn get_episodes_by_ids(
episode_ids: &[Uuid],
episodes_fallback: &tokio::sync::RwLock<HashMap<Uuid, Arc<Episode>>>,
cache_storage: Option<&Arc<dyn crate::StorageBackend>>,
turso_storage: Option<&Arc<dyn crate::StorageBackend>>,
) -> Result<Vec<Episode>> {
if episode_ids.is_empty() {
return Ok(vec![]);
}
let mut found_episodes: HashMap<Uuid, Episode> = HashMap::new();
let mut missing_ids: Vec<Uuid> = Vec::new();
{
let episodes = episodes_fallback.read().await;
for &id in episode_ids {
if let Some(ep) = episodes.get(&id) {
found_episodes.insert(id, (**ep).clone());
} else {
missing_ids.push(id);
}
}
}
debug!(
requested = episode_ids.len(),
found_in_memory = found_episodes.len(),
missing = missing_ids.len(),
"Bulk episode lookup: checked memory cache"
);
if missing_ids.is_empty() {
return Ok(found_episodes.into_values().collect());
}
if let Some(cache) = cache_storage {
let mut still_missing = Vec::new();
for id in &missing_ids {
match cache.get_episode(*id).await {
Ok(Some(episode)) => {
found_episodes.insert(*id, episode);
}
Ok(None) => {
still_missing.push(*id);
}
Err(e) => {
debug!(episode_id = %id, error = %e, "Failed to query cache storage");
still_missing.push(*id);
}
}
}
missing_ids = still_missing;
debug!(
found_in_cache = found_episodes.len(),
still_missing = missing_ids.len(),
"Bulk episode lookup: checked cache storage"
);
}
if missing_ids.is_empty() {
let mut episodes = episodes_fallback.write().await;
for (id, episode) in &found_episodes {
if !episodes.contains_key(id) {
episodes.insert(*id, Arc::new(episode.clone()));
}
}
return Ok(found_episodes.into_values().collect());
}
if let Some(turso) = turso_storage {
for id in &missing_ids {
match turso.get_episode(*id).await {
Ok(Some(episode)) => {
found_episodes.insert(*id, episode);
}
Ok(None) => {
}
Err(e) => {
debug!(episode_id = %id, error = %e, "Failed to query durable storage");
}
}
}
debug!(
total_found = found_episodes.len(),
total_requested = episode_ids.len(),
"Bulk episode lookup: checked durable storage"
);
}
{
let mut episodes = episodes_fallback.write().await;
for (id, episode) in &found_episodes {
if !episodes.contains_key(id) {
episodes.insert(*id, Arc::new(episode.clone()));
}
}
}
info!(
requested = episode_ids.len(),
found = found_episodes.len(),
"Completed bulk episode retrieval"
);
Ok(found_episodes.into_values().collect())
}