Skip to main content

ceres_core/
traits.rs

1//! Trait definitions for external dependencies.
2//!
3//! This module defines traits that abstract over external dependencies
4//! (embedding providers, portal clients, data stores), enabling:
5//!
6//! - **Testability**: Mock implementations for unit testing
7//! - **Flexibility**: Different backend implementations (e.g., different embedding APIs)
8//! - **Decoupling**: Core business logic doesn't depend on specific implementations
9//!
10//! # Example
11//!
12//! ```
13//! use ceres_core::traits::{EmbeddingProvider, DatasetStore};
14//!
15//! // Business logic uses traits, not concrete types
16//! async fn search_datasets<E, S>(
17//!     embedding: &E,
18//!     store: &S,
19//!     query: &str,
20//! ) -> Result<Vec<ceres_core::SearchResult>, ceres_core::AppError>
21//! where
22//!     E: EmbeddingProvider,
23//!     S: DatasetStore,
24//! {
25//!     let vector: Vec<f32> = embedding.generate(query).await?;
26//!     store.search(vector, 10).await
27//! }
28//! ```
29
30use std::collections::HashMap;
31use std::future::Future;
32
33use chrono::{DateTime, Utc};
34use futures::stream::{self, BoxStream};
35use uuid::Uuid;
36
37use crate::config::PortalType;
38use crate::{AppError, Dataset, NewDataset, SearchResult};
39
40/// Provider for generating text embeddings.
41///
42/// Implementations convert text into vector representations for semantic search.
43/// Different providers may produce vectors of different dimensions:
44/// - Gemini text-embedding-004: 768 dimensions
45/// - OpenAI text-embedding-3-small: 1536 dimensions
46/// - OpenAI text-embedding-3-large: 3072 dimensions
47pub trait EmbeddingProvider: Send + Sync + Clone {
48    /// Returns the provider identifier for logging and configuration.
49    ///
50    /// # Examples
51    ///
52    /// - `"gemini"` for Google Gemini
53    /// - `"openai"` for OpenAI
54    fn name(&self) -> &'static str;
55
56    /// Returns the embedding dimension this provider generates.
57    ///
58    /// This value must match the database column dimension for vector storage.
59    /// Mismatched dimensions will cause insertion failures.
60    fn dimension(&self) -> usize;
61
62    /// Generates an embedding vector for the given text.
63    ///
64    /// # Arguments
65    ///
66    /// * `text` - The text to embed
67    ///
68    /// # Returns
69    ///
70    /// A vector of floating-point values representing the text embedding.
71    /// The vector length must equal `self.dimension()`.
72    fn generate(&self, text: &str) -> impl Future<Output = Result<Vec<f32>, AppError>> + Send;
73
74    /// Maximum number of texts supported per batch API call.
75    ///
76    /// The harvest pipeline uses `min(config.embedding_batch_size, max_batch_size())`
77    /// to ensure batches never exceed provider limits.
78    ///
79    /// # Defaults
80    ///
81    /// Returns `1` (single-item batches). Providers with native batch support
82    /// should override to enable efficient batching.
83    fn max_batch_size(&self) -> usize {
84        1
85    }
86
87    /// Generates embeddings for multiple texts in a batch.
88    ///
89    /// The default implementation calls `generate()` sequentially.
90    /// Providers with native batch API support should override for efficiency.
91    ///
92    /// # Arguments
93    ///
94    /// * `texts` - Slice of texts to embed
95    ///
96    /// # Returns
97    ///
98    /// A vector of embedding vectors, one per input text.
99    fn generate_batch(
100        &self,
101        texts: &[String],
102    ) -> impl Future<Output = Result<Vec<Vec<f32>>, AppError>> + Send {
103        let texts_owned: Vec<String> = texts.to_vec();
104        async move {
105            let mut results = Vec::with_capacity(texts_owned.len());
106            for text in &texts_owned {
107                results.push(self.generate(text).await?);
108            }
109            Ok(results)
110        }
111    }
112}
113
114/// Client for accessing open data portals (CKAN, Socrata, etc.).
115///
116/// Implementations fetch dataset metadata from portal APIs.
117pub trait PortalClient: Send + Sync + Clone {
118    /// Type representing raw portal data before transformation.
119    type PortalData: Send;
120
121    /// Returns the portal type identifier (e.g., "ckan", "socrata", "dcat").
122    fn portal_type(&self) -> &'static str;
123
124    /// Returns the base URL of the portal.
125    fn base_url(&self) -> &str;
126
127    /// Lists all dataset IDs available on the portal.
128    fn list_dataset_ids(&self) -> impl Future<Output = Result<Vec<String>, AppError>> + Send;
129
130    /// Fetches detailed metadata for a specific dataset.
131    ///
132    /// # Arguments
133    ///
134    /// * `id` - The dataset identifier
135    fn get_dataset(
136        &self,
137        id: &str,
138    ) -> impl Future<Output = Result<Self::PortalData, AppError>> + Send;
139
140    /// Converts portal-specific data into a normalized NewDataset.
141    ///
142    /// # Arguments
143    ///
144    /// * `data` - The raw portal data
145    /// * `portal_url` - The portal URL for source tracking
146    /// * `url_template` - Optional URL template with `{id}` and `{name}` placeholders
147    /// * `language` - Preferred language for resolving multilingual fields
148    fn into_new_dataset(
149        data: Self::PortalData,
150        portal_url: &str,
151        url_template: Option<&str>,
152        language: &str,
153    ) -> NewDataset;
154
155    /// Searches for datasets modified since the given timestamp.
156    ///
157    /// Used for incremental harvesting to fetch only recently modified datasets.
158    /// Returns full dataset objects, eliminating the need for separate get_dataset calls.
159    ///
160    /// # Arguments
161    ///
162    /// * `since` - Only return datasets modified after this timestamp
163    ///
164    /// # Returns
165    ///
166    /// A vector of portal-specific dataset objects modified since the given time.
167    /// Returns an error if the portal doesn't support incremental search.
168    fn search_modified_since(
169        &self,
170        since: DateTime<Utc>,
171    ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send;
172
173    /// Fetches all datasets from the portal in bulk using paginated search.
174    ///
175    /// This is far more efficient than `list_dataset_ids()` + individual
176    /// `get_dataset()` calls for large portals (e.g., HDX with ~40k datasets),
177    /// as it avoids per-dataset HTTP requests and rate limiting.
178    ///
179    /// Returns full dataset objects ready for processing.
180    /// The default implementation falls back to `list_dataset_ids()` + `get_dataset()`
181    /// for portals that don't support bulk search.
182    fn search_all_datasets(
183        &self,
184    ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send {
185        async {
186            Err(AppError::Generic(
187                "search_all_datasets not supported".to_string(),
188            ))
189        }
190    }
191
192    /// Streams all datasets from the portal page-by-page.
193    ///
194    /// Each stream item is a page of datasets (e.g., 1000 for CKAN, 100 for DCAT),
195    /// matching natural pagination boundaries. This bounds peak memory to one page
196    /// instead of accumulating the entire catalog.
197    ///
198    /// The default implementation wraps `search_all_datasets()` as a single-page
199    /// stream, so existing implementors work without changes.
200    fn search_all_datasets_stream(&self) -> BoxStream<'_, Result<Vec<Self::PortalData>, AppError>> {
201        Box::pin(stream::once(self.search_all_datasets()))
202    }
203
204    /// Returns the total number of datasets on the portal.
205    ///
206    /// Used for progress reporting when streaming. Portals that support a cheap
207    /// count query should override this. The default returns an error.
208    fn dataset_count(&self) -> impl Future<Output = Result<usize, AppError>> + Send {
209        async { Err(AppError::Generic("dataset_count not supported".to_string())) }
210    }
211}
212
213/// Factory for creating portal clients.
214///
215/// Separate from PortalClient to avoid issues with async trait constructors.
216pub trait PortalClientFactory: Send + Sync + Clone {
217    /// The type of portal client this factory creates.
218    type Client: PortalClient;
219
220    /// Creates a new portal client for the given URL, portal type, and optional profile.
221    ///
222    /// # Arguments
223    ///
224    /// * `portal_url` - The portal API base URL
225    /// * `portal_type` - The type of portal to create a client for
226    /// * `language` - Preferred language for multilingual portals (e.g. "en", "fr")
227    /// * `profile` - Optional profile for sub-dispatch (e.g. `"sparql"` for DCAT portals)
228    /// * `sparql_endpoint` - Optional custom SPARQL endpoint URL (overrides `{url}/sparql`)
229    fn create(
230        &self,
231        portal_url: &str,
232        portal_type: PortalType,
233        language: &str,
234        profile: Option<&str>,
235        sparql_endpoint: Option<&str>,
236    ) -> Result<Self::Client, AppError>;
237}
238
239/// Store for dataset persistence and retrieval.
240///
241/// Implementations handle database operations for datasets.
242pub trait DatasetStore: Send + Sync + Clone {
243    /// Retrieves a dataset by its unique ID.
244    ///
245    /// # Arguments
246    ///
247    /// * `id` - The dataset's UUID
248    ///
249    /// # Returns
250    ///
251    /// The dataset if found, or None if not exists.
252    fn get_by_id(&self, id: Uuid)
253    -> impl Future<Output = Result<Option<Dataset>, AppError>> + Send;
254
255    /// Retrieves content hashes for all datasets from a specific portal.
256    ///
257    /// Used for delta detection to determine which datasets need reprocessing.
258    ///
259    /// # Arguments
260    ///
261    /// * `portal_url` - The source portal URL
262    ///
263    /// # Returns
264    ///
265    /// A map from original_id to optional content_hash.
266    fn get_hashes_for_portal(
267        &self,
268        portal_url: &str,
269    ) -> impl Future<Output = Result<HashMap<String, Option<String>>, AppError>> + Send;
270
271    /// Updates only the timestamp for an unchanged dataset.
272    ///
273    /// Used when content hash matches but we want to track "last seen" time.
274    ///
275    /// # Arguments
276    ///
277    /// * `portal_url` - The source portal URL
278    /// * `original_id` - The dataset's original ID from the portal
279    fn update_timestamp_only(
280        &self,
281        portal_url: &str,
282        original_id: &str,
283    ) -> impl Future<Output = Result<(), AppError>> + Send;
284
285    /// Batch updates timestamps for multiple unchanged datasets.
286    ///
287    /// More efficient than calling `update_timestamp_only` for each dataset.
288    ///
289    /// # Arguments
290    ///
291    /// * `portal_url` - The source portal URL
292    /// * `original_ids` - Slice of dataset original IDs to update
293    ///
294    /// # Returns
295    ///
296    /// The number of rows actually updated.
297    fn batch_update_timestamps(
298        &self,
299        portal_url: &str,
300        original_ids: &[String],
301    ) -> impl Future<Output = Result<u64, AppError>> + Send;
302
303    /// Marks datasets as stale if they were not seen during the latest full sync.
304    ///
305    /// After a successful full sync, any dataset whose `last_updated_at` is older
306    /// than `sync_start` was not present in the portal's response.
307    ///
308    /// # Arguments
309    ///
310    /// * `portal_url` - The source portal URL
311    /// * `sync_start` - Timestamp recorded at the start of the sync
312    ///
313    /// # Returns
314    ///
315    /// The number of datasets newly marked as stale.
316    fn mark_stale_datasets(
317        &self,
318        portal_url: &str,
319        sync_start: DateTime<Utc>,
320    ) -> impl Future<Output = Result<u64, AppError>> + Send;
321
322    /// Marks datasets as stale if their original_id is NOT in the given set.
323    ///
324    /// This is more efficient than the timestamp-based approach because it
325    /// avoids updating every unchanged row just to compare timestamps later.
326    /// Instead, we directly identify stale datasets by exclusion.
327    ///
328    /// # Arguments
329    ///
330    /// * `portal_url` - The source portal URL
331    /// * `seen_ids` - All original_ids seen during the current full sync
332    ///
333    /// # Returns
334    ///
335    /// The number of datasets newly marked as stale.
336    fn mark_stale_by_exclusion(
337        &self,
338        portal_url: &str,
339        seen_ids: &[String],
340    ) -> impl Future<Output = Result<u64, AppError>> + Send;
341
342    /// Inserts or updates a dataset.
343    ///
344    /// # Arguments
345    ///
346    /// * `dataset` - The dataset to upsert
347    ///
348    /// # Returns
349    ///
350    /// The UUID of the affected row.
351    fn upsert(&self, dataset: &NewDataset) -> impl Future<Output = Result<Uuid, AppError>> + Send;
352
353    /// Batch upserts multiple datasets in a single operation.
354    ///
355    /// Much faster than calling `upsert` in a loop because it reduces
356    /// database round-trips and amortizes index update costs.
357    ///
358    /// # Arguments
359    ///
360    /// * `datasets` - Slice of datasets to upsert
361    ///
362    /// # Returns
363    ///
364    /// The UUIDs of all affected rows.
365    fn batch_upsert(
366        &self,
367        datasets: &[NewDataset],
368    ) -> impl Future<Output = Result<Vec<Uuid>, AppError>> + Send;
369
370    /// Performs vector similarity search.
371    ///
372    /// # Arguments
373    ///
374    /// * `query_vector` - The embedding vector to search for
375    /// * `limit` - Maximum number of results
376    ///
377    /// # Returns
378    ///
379    /// Datasets ranked by similarity score (highest first).
380    fn search(
381        &self,
382        query_vector: Vec<f32>,
383        limit: usize,
384    ) -> impl Future<Output = Result<Vec<SearchResult>, AppError>> + Send;
385
386    /// Lists datasets as a stream with optional filtering.
387    ///
388    /// This method returns a stream of datasets for memory-efficient
389    /// processing of large result sets. Unlike batch methods, it streams
390    /// results directly from the database without loading everything into memory.
391    ///
392    /// # Arguments
393    ///
394    /// * `portal_filter` - Optional portal URL to filter by
395    /// * `limit` - Optional maximum number of records
396    fn list_stream<'a>(
397        &'a self,
398        portal_filter: Option<&'a str>,
399        limit: Option<usize>,
400    ) -> BoxStream<'a, Result<Dataset, AppError>>;
401
402    /// Retrieves the last successful sync timestamp for a portal.
403    ///
404    /// Used for incremental harvesting to determine which datasets
405    /// have been modified since the last sync.
406    ///
407    /// # Arguments
408    ///
409    /// * `portal_url` - The source portal URL
410    ///
411    /// # Returns
412    ///
413    /// The timestamp of the last successful sync, or None if never synced.
414    fn get_last_sync_time(
415        &self,
416        portal_url: &str,
417    ) -> impl Future<Output = Result<Option<DateTime<Utc>>, AppError>> + Send;
418
419    /// Records a sync status for a portal.
420    ///
421    /// Called after a harvest operation to update the sync status.
422    /// The `sync_status` parameter indicates the outcome: "completed" or "cancelled".
423    ///
424    /// # Arguments
425    ///
426    /// * `portal_url` - The source portal URL
427    /// * `sync_time` - The timestamp of this sync
428    /// * `sync_mode` - Either "full" or "incremental"
429    ///   TODO(design): sync_mode/sync_status should be typed enums, not &str
430    /// * `sync_status` - The outcome: "completed" or "cancelled"
431    /// * `datasets_synced` - Number of datasets processed
432    fn record_sync_status(
433        &self,
434        portal_url: &str,
435        sync_time: DateTime<Utc>,
436        sync_mode: &str,
437        sync_status: &str,
438        datasets_synced: i32,
439    ) -> impl Future<Output = Result<(), AppError>> + Send;
440
441    /// Returns lowercased titles that appear across multiple portals.
442    ///
443    /// Used for cross-portal duplicate detection in Parquet exports.
444    /// Typically returns ~21k titles (~2MB) for the full dataset.
445    fn get_duplicate_titles(
446        &self,
447    ) -> impl Future<Output = Result<std::collections::HashSet<String>, AppError>> + Send;
448
449    /// Lists datasets that have no embedding vector (`embedding IS NULL`).
450    ///
451    /// Used by [`crate::EmbeddingService`] to find datasets needing embedding generation.
452    ///
453    /// # Arguments
454    ///
455    /// * `portal_filter` - Optional portal URL to scope the query
456    /// * `limit` - Maximum number of datasets to return
457    fn list_pending_embeddings(
458        &self,
459        portal_filter: Option<&str>,
460        limit: Option<usize>,
461    ) -> impl Future<Output = Result<Vec<Dataset>, AppError>> + Send;
462
463    /// Counts datasets with `embedding IS NULL`.
464    ///
465    /// Used for progress reporting in the embedding service.
466    ///
467    /// # Arguments
468    ///
469    /// * `portal_filter` - Optional portal URL to scope the count
470    fn count_pending_embeddings(
471        &self,
472        portal_filter: Option<&str>,
473    ) -> impl Future<Output = Result<i64, AppError>> + Send;
474
475    /// Checks database connectivity.
476    ///
477    /// Performs a simple query to verify the database is reachable and responsive.
478    /// Used by health check endpoints.
479    fn health_check(&self) -> impl Future<Output = Result<(), AppError>> + Send;
480}