Skip to main content

ceres_core/
traits.rs

1//! Trait definitions for external dependencies.
2//!
3//! This module defines traits that abstract over external dependencies
4//! (embedding providers, portal clients, data stores), enabling:
5//!
6//! - **Testability**: Mock implementations for unit testing
7//! - **Flexibility**: Different backend implementations (e.g., different embedding APIs)
8//! - **Decoupling**: Core business logic doesn't depend on specific implementations
9//!
10//! # Example
11//!
12//! ```
13//! use ceres_core::traits::{EmbeddingProvider, DatasetStore};
14//! use pgvector::Vector;
15//!
16//! // Business logic uses traits, not concrete types
17//! async fn search_datasets<E, S>(
18//!     embedding: &E,
19//!     store: &S,
20//!     query: &str,
21//! ) -> Result<Vec<ceres_core::SearchResult>, ceres_core::AppError>
22//! where
23//!     E: EmbeddingProvider,
24//!     S: DatasetStore,
25//! {
26//!     let vector: Vector = embedding.generate(query).await?.into();
27//!     store.search(vector, 10).await
28//! }
29//! ```
30
31use std::collections::HashMap;
32use std::future::Future;
33
34use chrono::{DateTime, Utc};
35use futures::stream::BoxStream;
36use pgvector::Vector;
37use uuid::Uuid;
38
39use crate::{AppError, Dataset, NewDataset, SearchResult};
40
41/// Provider for generating text embeddings.
42///
43/// Implementations convert text into vector representations for semantic search.
44/// Different providers may produce vectors of different dimensions:
45/// - Gemini text-embedding-004: 768 dimensions
46/// - OpenAI text-embedding-3-small: 1536 dimensions
47/// - OpenAI text-embedding-3-large: 3072 dimensions
48pub trait EmbeddingProvider: Send + Sync + Clone {
49    /// Returns the provider identifier for logging and configuration.
50    ///
51    /// # Examples
52    ///
53    /// - `"gemini"` for Google Gemini
54    /// - `"openai"` for OpenAI
55    fn name(&self) -> &'static str;
56
57    /// Returns the embedding dimension this provider generates.
58    ///
59    /// This value must match the database column dimension for vector storage.
60    /// Mismatched dimensions will cause insertion failures.
61    fn dimension(&self) -> usize;
62
63    /// Generates an embedding vector for the given text.
64    ///
65    /// # Arguments
66    ///
67    /// * `text` - The text to embed
68    ///
69    /// # Returns
70    ///
71    /// A vector of floating-point values representing the text embedding.
72    /// The vector length must equal `self.dimension()`.
73    fn generate(&self, text: &str) -> impl Future<Output = Result<Vec<f32>, AppError>> + Send;
74
75    /// Generates embeddings for multiple texts in a batch.
76    ///
77    /// The default implementation calls `generate()` sequentially.
78    /// Providers with native batch API support should override for efficiency.
79    ///
80    /// # Arguments
81    ///
82    /// * `texts` - Slice of texts to embed
83    ///
84    /// # Returns
85    ///
86    /// A vector of embedding vectors, one per input text.
87    fn generate_batch(
88        &self,
89        texts: &[String],
90    ) -> impl Future<Output = Result<Vec<Vec<f32>>, AppError>> + Send {
91        let texts_owned: Vec<String> = texts.to_vec();
92        async move {
93            let mut results = Vec::with_capacity(texts_owned.len());
94            for text in &texts_owned {
95                results.push(self.generate(text).await?);
96            }
97            Ok(results)
98        }
99    }
100}
101
102/// Client for accessing open data portals (CKAN, Socrata, etc.).
103///
104/// Implementations fetch dataset metadata from portal APIs.
105pub trait PortalClient: Send + Sync + Clone {
106    /// Type representing raw portal data before transformation.
107    type PortalData: Send;
108
109    /// Lists all dataset IDs available on the portal.
110    fn list_dataset_ids(&self) -> impl Future<Output = Result<Vec<String>, AppError>> + Send;
111
112    /// Fetches detailed metadata for a specific dataset.
113    ///
114    /// # Arguments
115    ///
116    /// * `id` - The dataset identifier
117    fn get_dataset(
118        &self,
119        id: &str,
120    ) -> impl Future<Output = Result<Self::PortalData, AppError>> + Send;
121
122    /// Converts portal-specific data into a normalized NewDataset.
123    ///
124    /// # Arguments
125    ///
126    /// * `data` - The raw portal data
127    /// * `portal_url` - The portal URL for source tracking
128    fn into_new_dataset(data: Self::PortalData, portal_url: &str) -> NewDataset;
129
130    /// Searches for datasets modified since the given timestamp.
131    ///
132    /// Used for incremental harvesting to fetch only recently modified datasets.
133    /// Returns full dataset objects, eliminating the need for separate get_dataset calls.
134    ///
135    /// # Arguments
136    ///
137    /// * `since` - Only return datasets modified after this timestamp
138    ///
139    /// # Returns
140    ///
141    /// A vector of portal-specific dataset objects modified since the given time.
142    /// Returns an error if the portal doesn't support incremental search.
143    fn search_modified_since(
144        &self,
145        since: DateTime<Utc>,
146    ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send;
147}
148
149/// Factory for creating portal clients.
150///
151/// Separate from PortalClient to avoid issues with async trait constructors.
152pub trait PortalClientFactory: Send + Sync + Clone {
153    /// The type of portal client this factory creates.
154    type Client: PortalClient;
155
156    /// Creates a new portal client for the given URL.
157    ///
158    /// # Arguments
159    ///
160    /// * `portal_url` - The portal API base URL
161    fn create(&self, portal_url: &str) -> Result<Self::Client, AppError>;
162}
163
164/// Store for dataset persistence and retrieval.
165///
166/// Implementations handle database operations for datasets.
167pub trait DatasetStore: Send + Sync + Clone {
168    /// Retrieves a dataset by its unique ID.
169    ///
170    /// # Arguments
171    ///
172    /// * `id` - The dataset's UUID
173    ///
174    /// # Returns
175    ///
176    /// The dataset if found, or None if not exists.
177    fn get_by_id(&self, id: Uuid)
178    -> impl Future<Output = Result<Option<Dataset>, AppError>> + Send;
179
180    /// Retrieves content hashes for all datasets from a specific portal.
181    ///
182    /// Used for delta detection to determine which datasets need reprocessing.
183    ///
184    /// # Arguments
185    ///
186    /// * `portal_url` - The source portal URL
187    ///
188    /// # Returns
189    ///
190    /// A map from original_id to optional content_hash.
191    fn get_hashes_for_portal(
192        &self,
193        portal_url: &str,
194    ) -> impl Future<Output = Result<HashMap<String, Option<String>>, AppError>> + Send;
195
196    /// Updates only the timestamp for an unchanged dataset.
197    ///
198    /// Used when content hash matches but we want to track "last seen" time.
199    ///
200    /// # Arguments
201    ///
202    /// * `portal_url` - The source portal URL
203    /// * `original_id` - The dataset's original ID from the portal
204    fn update_timestamp_only(
205        &self,
206        portal_url: &str,
207        original_id: &str,
208    ) -> impl Future<Output = Result<(), AppError>> + Send;
209
210    /// Batch updates timestamps for multiple unchanged datasets.
211    ///
212    /// More efficient than calling `update_timestamp_only` for each dataset.
213    ///
214    /// # Arguments
215    ///
216    /// * `portal_url` - The source portal URL
217    /// * `original_ids` - Slice of dataset original IDs to update
218    ///
219    /// # Returns
220    ///
221    /// The number of rows actually updated.
222    fn batch_update_timestamps(
223        &self,
224        portal_url: &str,
225        original_ids: &[String],
226    ) -> impl Future<Output = Result<u64, AppError>> + Send;
227
228    /// Inserts or updates a dataset.
229    ///
230    /// # Arguments
231    ///
232    /// * `dataset` - The dataset to upsert
233    ///
234    /// # Returns
235    ///
236    /// The UUID of the affected row.
237    fn upsert(&self, dataset: &NewDataset) -> impl Future<Output = Result<Uuid, AppError>> + Send;
238
239    /// Performs vector similarity search.
240    ///
241    /// # Arguments
242    ///
243    /// * `query_vector` - The embedding vector to search for
244    /// * `limit` - Maximum number of results
245    ///
246    /// # Returns
247    ///
248    /// Datasets ranked by similarity score (highest first).
249    fn search(
250        &self,
251        query_vector: Vector,
252        limit: usize,
253    ) -> impl Future<Output = Result<Vec<SearchResult>, AppError>> + Send;
254
255    /// Lists datasets as a stream with optional filtering.
256    ///
257    /// This method returns a stream of datasets for memory-efficient
258    /// processing of large result sets. Unlike batch methods, it streams
259    /// results directly from the database without loading everything into memory.
260    ///
261    /// # Arguments
262    ///
263    /// * `portal_filter` - Optional portal URL to filter by
264    /// * `limit` - Optional maximum number of records
265    fn list_stream<'a>(
266        &'a self,
267        portal_filter: Option<&'a str>,
268        limit: Option<usize>,
269    ) -> BoxStream<'a, Result<Dataset, AppError>>;
270
271    /// Retrieves the last successful sync timestamp for a portal.
272    ///
273    /// Used for incremental harvesting to determine which datasets
274    /// have been modified since the last sync.
275    ///
276    /// # Arguments
277    ///
278    /// * `portal_url` - The source portal URL
279    ///
280    /// # Returns
281    ///
282    /// The timestamp of the last successful sync, or None if never synced.
283    fn get_last_sync_time(
284        &self,
285        portal_url: &str,
286    ) -> impl Future<Output = Result<Option<DateTime<Utc>>, AppError>> + Send;
287
288    /// Records a sync status for a portal.
289    ///
290    /// Called after a harvest operation to update the sync status.
291    /// The `sync_status` parameter indicates the outcome: "completed" or "cancelled".
292    ///
293    /// # Arguments
294    ///
295    /// * `portal_url` - The source portal URL
296    /// * `sync_time` - The timestamp of this sync
297    /// * `sync_mode` - Either "full" or "incremental"
298    /// * `sync_status` - The outcome: "completed" or "cancelled"
299    /// * `datasets_synced` - Number of datasets processed
300    fn record_sync_status(
301        &self,
302        portal_url: &str,
303        sync_time: DateTime<Utc>,
304        sync_mode: &str,
305        sync_status: &str,
306        datasets_synced: i32,
307    ) -> impl Future<Output = Result<(), AppError>> + Send;
308
309    /// Checks database connectivity.
310    ///
311    /// Performs a simple query to verify the database is reachable and responsive.
312    /// Used by health check endpoints.
313    fn health_check(&self) -> impl Future<Output = Result<(), AppError>> + Send;
314}