ceres_core/traits.rs
1//! Trait definitions for external dependencies.
2//!
3//! This module defines traits that abstract over external dependencies
4//! (embedding providers, portal clients, data stores), enabling:
5//!
6//! - **Testability**: Mock implementations for unit testing
7//! - **Flexibility**: Different backend implementations (e.g., different embedding APIs)
8//! - **Decoupling**: Core business logic doesn't depend on specific implementations
9//!
10//! # Example
11//!
12//! ```
13//! use ceres_core::traits::{EmbeddingProvider, DatasetStore};
14//!
15//! // Business logic uses traits, not concrete types
16//! async fn search_datasets<E, S>(
17//! embedding: &E,
18//! store: &S,
19//! query: &str,
20//! ) -> Result<Vec<ceres_core::SearchResult>, ceres_core::AppError>
21//! where
22//! E: EmbeddingProvider,
23//! S: DatasetStore,
24//! {
25//! let vector: Vec<f32> = embedding.generate(query).await?;
26//! store.search(vector, 10).await
27//! }
28//! ```
29
30use std::collections::HashMap;
31use std::future::Future;
32
33use chrono::{DateTime, Utc};
34use futures::stream::BoxStream;
35use uuid::Uuid;
36
37use crate::config::PortalType;
38use crate::{AppError, Dataset, NewDataset, SearchResult};
39
40/// Provider for generating text embeddings.
41///
42/// Implementations convert text into vector representations for semantic search.
43/// Different providers may produce vectors of different dimensions:
44/// - Gemini text-embedding-004: 768 dimensions
45/// - OpenAI text-embedding-3-small: 1536 dimensions
46/// - OpenAI text-embedding-3-large: 3072 dimensions
47pub trait EmbeddingProvider: Send + Sync + Clone {
48 /// Returns the provider identifier for logging and configuration.
49 ///
50 /// # Examples
51 ///
52 /// - `"gemini"` for Google Gemini
53 /// - `"openai"` for OpenAI
54 fn name(&self) -> &'static str;
55
56 /// Returns the embedding dimension this provider generates.
57 ///
58 /// This value must match the database column dimension for vector storage.
59 /// Mismatched dimensions will cause insertion failures.
60 fn dimension(&self) -> usize;
61
62 /// Generates an embedding vector for the given text.
63 ///
64 /// # Arguments
65 ///
66 /// * `text` - The text to embed
67 ///
68 /// # Returns
69 ///
70 /// A vector of floating-point values representing the text embedding.
71 /// The vector length must equal `self.dimension()`.
72 fn generate(&self, text: &str) -> impl Future<Output = Result<Vec<f32>, AppError>> + Send;
73
74 /// Maximum number of texts supported per batch API call.
75 ///
76 /// The harvest pipeline uses `min(config.embedding_batch_size, max_batch_size())`
77 /// to ensure batches never exceed provider limits.
78 ///
79 /// # Defaults
80 ///
81 /// Returns `1` (single-item batches). Providers with native batch support
82 /// should override to enable efficient batching.
83 fn max_batch_size(&self) -> usize {
84 1
85 }
86
87 /// Generates embeddings for multiple texts in a batch.
88 ///
89 /// The default implementation calls `generate()` sequentially.
90 /// Providers with native batch API support should override for efficiency.
91 ///
92 /// # Arguments
93 ///
94 /// * `texts` - Slice of texts to embed
95 ///
96 /// # Returns
97 ///
98 /// A vector of embedding vectors, one per input text.
99 fn generate_batch(
100 &self,
101 texts: &[String],
102 ) -> impl Future<Output = Result<Vec<Vec<f32>>, AppError>> + Send {
103 let texts_owned: Vec<String> = texts.to_vec();
104 async move {
105 let mut results = Vec::with_capacity(texts_owned.len());
106 for text in &texts_owned {
107 results.push(self.generate(text).await?);
108 }
109 Ok(results)
110 }
111 }
112}
113
114/// Client for accessing open data portals (CKAN, Socrata, etc.).
115///
116/// Implementations fetch dataset metadata from portal APIs.
117pub trait PortalClient: Send + Sync + Clone {
118 /// Type representing raw portal data before transformation.
119 type PortalData: Send;
120
121 /// Returns the portal type identifier (e.g., "ckan", "socrata", "dcat").
122 fn portal_type(&self) -> &'static str;
123
124 /// Returns the base URL of the portal.
125 fn base_url(&self) -> &str;
126
127 /// Lists all dataset IDs available on the portal.
128 fn list_dataset_ids(&self) -> impl Future<Output = Result<Vec<String>, AppError>> + Send;
129
130 /// Fetches detailed metadata for a specific dataset.
131 ///
132 /// # Arguments
133 ///
134 /// * `id` - The dataset identifier
135 fn get_dataset(
136 &self,
137 id: &str,
138 ) -> impl Future<Output = Result<Self::PortalData, AppError>> + Send;
139
140 /// Converts portal-specific data into a normalized NewDataset.
141 ///
142 /// # Arguments
143 ///
144 /// * `data` - The raw portal data
145 /// * `portal_url` - The portal URL for source tracking
146 /// * `url_template` - Optional URL template with `{id}` and `{name}` placeholders
147 /// * `language` - Preferred language for resolving multilingual fields
148 fn into_new_dataset(
149 data: Self::PortalData,
150 portal_url: &str,
151 url_template: Option<&str>,
152 language: &str,
153 ) -> NewDataset;
154
155 /// Searches for datasets modified since the given timestamp.
156 ///
157 /// Used for incremental harvesting to fetch only recently modified datasets.
158 /// Returns full dataset objects, eliminating the need for separate get_dataset calls.
159 ///
160 /// # Arguments
161 ///
162 /// * `since` - Only return datasets modified after this timestamp
163 ///
164 /// # Returns
165 ///
166 /// A vector of portal-specific dataset objects modified since the given time.
167 /// Returns an error if the portal doesn't support incremental search.
168 fn search_modified_since(
169 &self,
170 since: DateTime<Utc>,
171 ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send;
172
173 /// Fetches all datasets from the portal in bulk using paginated search.
174 ///
175 /// This is far more efficient than `list_dataset_ids()` + individual
176 /// `get_dataset()` calls for large portals (e.g., HDX with ~40k datasets),
177 /// as it avoids per-dataset HTTP requests and rate limiting.
178 ///
179 /// Returns full dataset objects ready for processing.
180 /// The default implementation falls back to `list_dataset_ids()` + `get_dataset()`
181 /// for portals that don't support bulk search.
182 fn search_all_datasets(
183 &self,
184 ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send {
185 async {
186 Err(AppError::Generic(
187 "search_all_datasets not supported".to_string(),
188 ))
189 }
190 }
191}
192
193/// Factory for creating portal clients.
194///
195/// Separate from PortalClient to avoid issues with async trait constructors.
196pub trait PortalClientFactory: Send + Sync + Clone {
197 /// The type of portal client this factory creates.
198 type Client: PortalClient;
199
200 /// Creates a new portal client for the given URL and portal type.
201 ///
202 /// # Arguments
203 ///
204 /// * `portal_url` - The portal API base URL
205 /// * `portal_type` - The type of portal to create a client for
206 fn create(&self, portal_url: &str, portal_type: PortalType) -> Result<Self::Client, AppError>;
207}
208
209/// Store for dataset persistence and retrieval.
210///
211/// Implementations handle database operations for datasets.
212pub trait DatasetStore: Send + Sync + Clone {
213 /// Retrieves a dataset by its unique ID.
214 ///
215 /// # Arguments
216 ///
217 /// * `id` - The dataset's UUID
218 ///
219 /// # Returns
220 ///
221 /// The dataset if found, or None if not exists.
222 fn get_by_id(&self, id: Uuid)
223 -> impl Future<Output = Result<Option<Dataset>, AppError>> + Send;
224
225 /// Retrieves content hashes for all datasets from a specific portal.
226 ///
227 /// Used for delta detection to determine which datasets need reprocessing.
228 ///
229 /// # Arguments
230 ///
231 /// * `portal_url` - The source portal URL
232 ///
233 /// # Returns
234 ///
235 /// A map from original_id to optional content_hash.
236 fn get_hashes_for_portal(
237 &self,
238 portal_url: &str,
239 ) -> impl Future<Output = Result<HashMap<String, Option<String>>, AppError>> + Send;
240
241 /// Updates only the timestamp for an unchanged dataset.
242 ///
243 /// Used when content hash matches but we want to track "last seen" time.
244 ///
245 /// # Arguments
246 ///
247 /// * `portal_url` - The source portal URL
248 /// * `original_id` - The dataset's original ID from the portal
249 fn update_timestamp_only(
250 &self,
251 portal_url: &str,
252 original_id: &str,
253 ) -> impl Future<Output = Result<(), AppError>> + Send;
254
255 /// Batch updates timestamps for multiple unchanged datasets.
256 ///
257 /// More efficient than calling `update_timestamp_only` for each dataset.
258 ///
259 /// # Arguments
260 ///
261 /// * `portal_url` - The source portal URL
262 /// * `original_ids` - Slice of dataset original IDs to update
263 ///
264 /// # Returns
265 ///
266 /// The number of rows actually updated.
267 fn batch_update_timestamps(
268 &self,
269 portal_url: &str,
270 original_ids: &[String],
271 ) -> impl Future<Output = Result<u64, AppError>> + Send;
272
273 /// Marks datasets as stale if they were not seen during the latest full sync.
274 ///
275 /// After a successful full sync, any dataset whose `last_updated_at` is older
276 /// than `sync_start` was not present in the portal's response.
277 ///
278 /// # Arguments
279 ///
280 /// * `portal_url` - The source portal URL
281 /// * `sync_start` - Timestamp recorded at the start of the sync
282 ///
283 /// # Returns
284 ///
285 /// The number of datasets newly marked as stale.
286 fn mark_stale_datasets(
287 &self,
288 portal_url: &str,
289 sync_start: DateTime<Utc>,
290 ) -> impl Future<Output = Result<u64, AppError>> + Send;
291
292 /// Marks datasets as stale if their original_id is NOT in the given set.
293 ///
294 /// This is more efficient than the timestamp-based approach because it
295 /// avoids updating every unchanged row just to compare timestamps later.
296 /// Instead, we directly identify stale datasets by exclusion.
297 ///
298 /// # Arguments
299 ///
300 /// * `portal_url` - The source portal URL
301 /// * `seen_ids` - All original_ids seen during the current full sync
302 ///
303 /// # Returns
304 ///
305 /// The number of datasets newly marked as stale.
306 fn mark_stale_by_exclusion(
307 &self,
308 portal_url: &str,
309 seen_ids: &[String],
310 ) -> impl Future<Output = Result<u64, AppError>> + Send;
311
312 /// Inserts or updates a dataset.
313 ///
314 /// # Arguments
315 ///
316 /// * `dataset` - The dataset to upsert
317 ///
318 /// # Returns
319 ///
320 /// The UUID of the affected row.
321 fn upsert(&self, dataset: &NewDataset) -> impl Future<Output = Result<Uuid, AppError>> + Send;
322
323 /// Batch upserts multiple datasets in a single operation.
324 ///
325 /// Much faster than calling `upsert` in a loop because it reduces
326 /// database round-trips and amortizes index update costs.
327 ///
328 /// # Arguments
329 ///
330 /// * `datasets` - Slice of datasets to upsert
331 ///
332 /// # Returns
333 ///
334 /// The UUIDs of all affected rows.
335 fn batch_upsert(
336 &self,
337 datasets: &[NewDataset],
338 ) -> impl Future<Output = Result<Vec<Uuid>, AppError>> + Send;
339
340 /// Performs vector similarity search.
341 ///
342 /// # Arguments
343 ///
344 /// * `query_vector` - The embedding vector to search for
345 /// * `limit` - Maximum number of results
346 ///
347 /// # Returns
348 ///
349 /// Datasets ranked by similarity score (highest first).
350 fn search(
351 &self,
352 query_vector: Vec<f32>,
353 limit: usize,
354 ) -> impl Future<Output = Result<Vec<SearchResult>, AppError>> + Send;
355
356 /// Lists datasets as a stream with optional filtering.
357 ///
358 /// This method returns a stream of datasets for memory-efficient
359 /// processing of large result sets. Unlike batch methods, it streams
360 /// results directly from the database without loading everything into memory.
361 ///
362 /// # Arguments
363 ///
364 /// * `portal_filter` - Optional portal URL to filter by
365 /// * `limit` - Optional maximum number of records
366 fn list_stream<'a>(
367 &'a self,
368 portal_filter: Option<&'a str>,
369 limit: Option<usize>,
370 ) -> BoxStream<'a, Result<Dataset, AppError>>;
371
372 /// Retrieves the last successful sync timestamp for a portal.
373 ///
374 /// Used for incremental harvesting to determine which datasets
375 /// have been modified since the last sync.
376 ///
377 /// # Arguments
378 ///
379 /// * `portal_url` - The source portal URL
380 ///
381 /// # Returns
382 ///
383 /// The timestamp of the last successful sync, or None if never synced.
384 fn get_last_sync_time(
385 &self,
386 portal_url: &str,
387 ) -> impl Future<Output = Result<Option<DateTime<Utc>>, AppError>> + Send;
388
389 /// Records a sync status for a portal.
390 ///
391 /// Called after a harvest operation to update the sync status.
392 /// The `sync_status` parameter indicates the outcome: "completed" or "cancelled".
393 ///
394 /// # Arguments
395 ///
396 /// * `portal_url` - The source portal URL
397 /// * `sync_time` - The timestamp of this sync
398 /// * `sync_mode` - Either "full" or "incremental"
399 /// TODO(design): sync_mode/sync_status should be typed enums, not &str
400 /// * `sync_status` - The outcome: "completed" or "cancelled"
401 /// * `datasets_synced` - Number of datasets processed
402 fn record_sync_status(
403 &self,
404 portal_url: &str,
405 sync_time: DateTime<Utc>,
406 sync_mode: &str,
407 sync_status: &str,
408 datasets_synced: i32,
409 ) -> impl Future<Output = Result<(), AppError>> + Send;
410
411 /// Returns lowercased titles that appear across multiple portals.
412 ///
413 /// Used for cross-portal duplicate detection in Parquet exports.
414 /// Typically returns ~21k titles (~2MB) for the full dataset.
415 fn get_duplicate_titles(
416 &self,
417 ) -> impl Future<Output = Result<std::collections::HashSet<String>, AppError>> + Send;
418
419 /// Lists datasets that have no embedding vector (`embedding IS NULL`).
420 ///
421 /// Used by [`crate::EmbeddingService`] to find datasets needing embedding generation.
422 ///
423 /// # Arguments
424 ///
425 /// * `portal_filter` - Optional portal URL to scope the query
426 /// * `limit` - Maximum number of datasets to return
427 fn list_pending_embeddings(
428 &self,
429 portal_filter: Option<&str>,
430 limit: Option<usize>,
431 ) -> impl Future<Output = Result<Vec<Dataset>, AppError>> + Send;
432
433 /// Counts datasets with `embedding IS NULL`.
434 ///
435 /// Used for progress reporting in the embedding service.
436 ///
437 /// # Arguments
438 ///
439 /// * `portal_filter` - Optional portal URL to scope the count
440 fn count_pending_embeddings(
441 &self,
442 portal_filter: Option<&str>,
443 ) -> impl Future<Output = Result<i64, AppError>> + Send;
444
445 /// Checks database connectivity.
446 ///
447 /// Performs a simple query to verify the database is reachable and responsive.
448 /// Used by health check endpoints.
449 fn health_check(&self) -> impl Future<Output = Result<(), AppError>> + Send;
450}