Skip to main content

ceres_core/
traits.rs

1//! Trait definitions for external dependencies.
2//!
3//! This module defines traits that abstract over external dependencies
4//! (embedding providers, portal clients, data stores), enabling:
5//!
6//! - **Testability**: Mock implementations for unit testing
7//! - **Flexibility**: Different backend implementations (e.g., different embedding APIs)
8//! - **Decoupling**: Core business logic doesn't depend on specific implementations
9//!
10//! # Example
11//!
12//! ```
13//! use ceres_core::traits::{EmbeddingProvider, DatasetStore};
14//! use pgvector::Vector;
15//!
16//! // Business logic uses traits, not concrete types
17//! async fn search_datasets<E, S>(
18//!     embedding: &E,
19//!     store: &S,
20//!     query: &str,
21//! ) -> Result<Vec<ceres_core::SearchResult>, ceres_core::AppError>
22//! where
23//!     E: EmbeddingProvider,
24//!     S: DatasetStore,
25//! {
26//!     let vector: Vector = embedding.generate(query).await?.into();
27//!     store.search(vector, 10).await
28//! }
29//! ```
30
31use std::collections::HashMap;
32use std::future::Future;
33
34use chrono::{DateTime, Utc};
35use futures::stream::BoxStream;
36use pgvector::Vector;
37use uuid::Uuid;
38
39use crate::{AppError, Dataset, NewDataset, SearchResult};
40
41/// Provider for generating text embeddings.
42///
43/// Implementations convert text into vector representations for semantic search.
44pub trait EmbeddingProvider: Send + Sync + Clone {
45    /// Generates an embedding vector for the given text.
46    ///
47    /// # Arguments
48    ///
49    /// * `text` - The text to embed
50    ///
51    /// # Returns
52    ///
53    /// A vector of floating-point values representing the text embedding.
54    fn generate(&self, text: &str) -> impl Future<Output = Result<Vec<f32>, AppError>> + Send;
55}
56
57/// Client for accessing open data portals (CKAN, Socrata, etc.).
58///
59/// Implementations fetch dataset metadata from portal APIs.
60pub trait PortalClient: Send + Sync + Clone {
61    /// Type representing raw portal data before transformation.
62    type PortalData: Send;
63
64    /// Lists all dataset IDs available on the portal.
65    fn list_dataset_ids(&self) -> impl Future<Output = Result<Vec<String>, AppError>> + Send;
66
67    /// Fetches detailed metadata for a specific dataset.
68    ///
69    /// # Arguments
70    ///
71    /// * `id` - The dataset identifier
72    fn get_dataset(
73        &self,
74        id: &str,
75    ) -> impl Future<Output = Result<Self::PortalData, AppError>> + Send;
76
77    /// Converts portal-specific data into a normalized NewDataset.
78    ///
79    /// # Arguments
80    ///
81    /// * `data` - The raw portal data
82    /// * `portal_url` - The portal URL for source tracking
83    fn into_new_dataset(data: Self::PortalData, portal_url: &str) -> NewDataset;
84
85    /// Searches for datasets modified since the given timestamp.
86    ///
87    /// Used for incremental harvesting to fetch only recently modified datasets.
88    /// Returns full dataset objects, eliminating the need for separate get_dataset calls.
89    ///
90    /// # Arguments
91    ///
92    /// * `since` - Only return datasets modified after this timestamp
93    ///
94    /// # Returns
95    ///
96    /// A vector of portal-specific dataset objects modified since the given time.
97    /// Returns an error if the portal doesn't support incremental search.
98    fn search_modified_since(
99        &self,
100        since: DateTime<Utc>,
101    ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send;
102}
103
104/// Factory for creating portal clients.
105///
106/// Separate from PortalClient to avoid issues with async trait constructors.
107pub trait PortalClientFactory: Send + Sync + Clone {
108    /// The type of portal client this factory creates.
109    type Client: PortalClient;
110
111    /// Creates a new portal client for the given URL.
112    ///
113    /// # Arguments
114    ///
115    /// * `portal_url` - The portal API base URL
116    fn create(&self, portal_url: &str) -> Result<Self::Client, AppError>;
117}
118
119/// Store for dataset persistence and retrieval.
120///
121/// Implementations handle database operations for datasets.
122pub trait DatasetStore: Send + Sync + Clone {
123    /// Retrieves a dataset by its unique ID.
124    ///
125    /// # Arguments
126    ///
127    /// * `id` - The dataset's UUID
128    ///
129    /// # Returns
130    ///
131    /// The dataset if found, or None if not exists.
132    fn get_by_id(&self, id: Uuid)
133    -> impl Future<Output = Result<Option<Dataset>, AppError>> + Send;
134
135    /// Retrieves content hashes for all datasets from a specific portal.
136    ///
137    /// Used for delta detection to determine which datasets need reprocessing.
138    ///
139    /// # Arguments
140    ///
141    /// * `portal_url` - The source portal URL
142    ///
143    /// # Returns
144    ///
145    /// A map from original_id to optional content_hash.
146    fn get_hashes_for_portal(
147        &self,
148        portal_url: &str,
149    ) -> impl Future<Output = Result<HashMap<String, Option<String>>, AppError>> + Send;
150
151    /// Updates only the timestamp for an unchanged dataset.
152    ///
153    /// Used when content hash matches but we want to track "last seen" time.
154    ///
155    /// # Arguments
156    ///
157    /// * `portal_url` - The source portal URL
158    /// * `original_id` - The dataset's original ID from the portal
159    fn update_timestamp_only(
160        &self,
161        portal_url: &str,
162        original_id: &str,
163    ) -> impl Future<Output = Result<(), AppError>> + Send;
164
165    /// Batch updates timestamps for multiple unchanged datasets.
166    ///
167    /// More efficient than calling `update_timestamp_only` for each dataset.
168    ///
169    /// # Arguments
170    ///
171    /// * `portal_url` - The source portal URL
172    /// * `original_ids` - Slice of dataset original IDs to update
173    ///
174    /// # Returns
175    ///
176    /// The number of rows actually updated.
177    fn batch_update_timestamps(
178        &self,
179        portal_url: &str,
180        original_ids: &[String],
181    ) -> impl Future<Output = Result<u64, AppError>> + Send;
182
183    /// Inserts or updates a dataset.
184    ///
185    /// # Arguments
186    ///
187    /// * `dataset` - The dataset to upsert
188    ///
189    /// # Returns
190    ///
191    /// The UUID of the affected row.
192    fn upsert(&self, dataset: &NewDataset) -> impl Future<Output = Result<Uuid, AppError>> + Send;
193
194    /// Performs vector similarity search.
195    ///
196    /// # Arguments
197    ///
198    /// * `query_vector` - The embedding vector to search for
199    /// * `limit` - Maximum number of results
200    ///
201    /// # Returns
202    ///
203    /// Datasets ranked by similarity score (highest first).
204    fn search(
205        &self,
206        query_vector: Vector,
207        limit: usize,
208    ) -> impl Future<Output = Result<Vec<SearchResult>, AppError>> + Send;
209
210    /// Lists datasets as a stream with optional filtering.
211    ///
212    /// This method returns a stream of datasets for memory-efficient
213    /// processing of large result sets. Unlike batch methods, it streams
214    /// results directly from the database without loading everything into memory.
215    ///
216    /// # Arguments
217    ///
218    /// * `portal_filter` - Optional portal URL to filter by
219    /// * `limit` - Optional maximum number of records
220    fn list_stream<'a>(
221        &'a self,
222        portal_filter: Option<&'a str>,
223        limit: Option<usize>,
224    ) -> BoxStream<'a, Result<Dataset, AppError>>;
225
226    /// Retrieves the last successful sync timestamp for a portal.
227    ///
228    /// Used for incremental harvesting to determine which datasets
229    /// have been modified since the last sync.
230    ///
231    /// # Arguments
232    ///
233    /// * `portal_url` - The source portal URL
234    ///
235    /// # Returns
236    ///
237    /// The timestamp of the last successful sync, or None if never synced.
238    fn get_last_sync_time(
239        &self,
240        portal_url: &str,
241    ) -> impl Future<Output = Result<Option<DateTime<Utc>>, AppError>> + Send;
242
243    /// Records a sync status for a portal.
244    ///
245    /// Called after a harvest operation to update the sync status.
246    /// The `sync_status` parameter indicates the outcome: "completed" or "cancelled".
247    ///
248    /// # Arguments
249    ///
250    /// * `portal_url` - The source portal URL
251    /// * `sync_time` - The timestamp of this sync
252    /// * `sync_mode` - Either "full" or "incremental"
253    /// * `sync_status` - The outcome: "completed" or "cancelled"
254    /// * `datasets_synced` - Number of datasets processed
255    fn record_sync_status(
256        &self,
257        portal_url: &str,
258        sync_time: DateTime<Utc>,
259        sync_mode: &str,
260        sync_status: &str,
261        datasets_synced: i32,
262    ) -> impl Future<Output = Result<(), AppError>> + Send;
263
264    /// Checks database connectivity.
265    ///
266    /// Performs a simple query to verify the database is reachable and responsive.
267    /// Used by health check endpoints.
268    fn health_check(&self) -> impl Future<Output = Result<(), AppError>> + Send;
269}