ceres_core/traits.rs
1//! Trait definitions for external dependencies.
2//!
3//! This module defines traits that abstract over external dependencies
4//! (embedding providers, portal clients, data stores), enabling:
5//!
6//! - **Testability**: Mock implementations for unit testing
7//! - **Flexibility**: Different backend implementations (e.g., different embedding APIs)
8//! - **Decoupling**: Core business logic doesn't depend on specific implementations
9//!
10//! # Example
11//!
12//! ```
13//! use ceres_core::traits::{EmbeddingProvider, DatasetStore};
14//! use pgvector::Vector;
15//!
16//! // Business logic uses traits, not concrete types
17//! async fn search_datasets<E, S>(
18//! embedding: &E,
19//! store: &S,
20//! query: &str,
21//! ) -> Result<Vec<ceres_core::SearchResult>, ceres_core::AppError>
22//! where
23//! E: EmbeddingProvider,
24//! S: DatasetStore,
25//! {
26//! let vector: Vector = embedding.generate(query).await?.into();
27//! store.search(vector, 10).await
28//! }
29//! ```
30
31use std::collections::HashMap;
32use std::future::Future;
33
34use chrono::{DateTime, Utc};
35use futures::stream::BoxStream;
36use pgvector::Vector;
37use uuid::Uuid;
38
39use crate::{AppError, Dataset, NewDataset, SearchResult};
40
41/// Provider for generating text embeddings.
42///
43/// Implementations convert text into vector representations for semantic search.
44pub trait EmbeddingProvider: Send + Sync + Clone {
45 /// Generates an embedding vector for the given text.
46 ///
47 /// # Arguments
48 ///
49 /// * `text` - The text to embed
50 ///
51 /// # Returns
52 ///
53 /// A vector of floating-point values representing the text embedding.
54 fn generate(&self, text: &str) -> impl Future<Output = Result<Vec<f32>, AppError>> + Send;
55}
56
57/// Client for accessing open data portals (CKAN, Socrata, etc.).
58///
59/// Implementations fetch dataset metadata from portal APIs.
60pub trait PortalClient: Send + Sync + Clone {
61 /// Type representing raw portal data before transformation.
62 type PortalData: Send;
63
64 /// Lists all dataset IDs available on the portal.
65 fn list_dataset_ids(&self) -> impl Future<Output = Result<Vec<String>, AppError>> + Send;
66
67 /// Fetches detailed metadata for a specific dataset.
68 ///
69 /// # Arguments
70 ///
71 /// * `id` - The dataset identifier
72 fn get_dataset(
73 &self,
74 id: &str,
75 ) -> impl Future<Output = Result<Self::PortalData, AppError>> + Send;
76
77 /// Converts portal-specific data into a normalized NewDataset.
78 ///
79 /// # Arguments
80 ///
81 /// * `data` - The raw portal data
82 /// * `portal_url` - The portal URL for source tracking
83 fn into_new_dataset(data: Self::PortalData, portal_url: &str) -> NewDataset;
84
85 /// Searches for datasets modified since the given timestamp.
86 ///
87 /// Used for incremental harvesting to fetch only recently modified datasets.
88 /// Returns full dataset objects, eliminating the need for separate get_dataset calls.
89 ///
90 /// # Arguments
91 ///
92 /// * `since` - Only return datasets modified after this timestamp
93 ///
94 /// # Returns
95 ///
96 /// A vector of portal-specific dataset objects modified since the given time.
97 /// Returns an error if the portal doesn't support incremental search.
98 fn search_modified_since(
99 &self,
100 since: DateTime<Utc>,
101 ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send;
102}
103
104/// Factory for creating portal clients.
105///
106/// Separate from PortalClient to avoid issues with async trait constructors.
107pub trait PortalClientFactory: Send + Sync + Clone {
108 /// The type of portal client this factory creates.
109 type Client: PortalClient;
110
111 /// Creates a new portal client for the given URL.
112 ///
113 /// # Arguments
114 ///
115 /// * `portal_url` - The portal API base URL
116 fn create(&self, portal_url: &str) -> Result<Self::Client, AppError>;
117}
118
119/// Store for dataset persistence and retrieval.
120///
121/// Implementations handle database operations for datasets.
122pub trait DatasetStore: Send + Sync + Clone {
123 /// Retrieves a dataset by its unique ID.
124 ///
125 /// # Arguments
126 ///
127 /// * `id` - The dataset's UUID
128 ///
129 /// # Returns
130 ///
131 /// The dataset if found, or None if not exists.
132 fn get_by_id(&self, id: Uuid)
133 -> impl Future<Output = Result<Option<Dataset>, AppError>> + Send;
134
135 /// Retrieves content hashes for all datasets from a specific portal.
136 ///
137 /// Used for delta detection to determine which datasets need reprocessing.
138 ///
139 /// # Arguments
140 ///
141 /// * `portal_url` - The source portal URL
142 ///
143 /// # Returns
144 ///
145 /// A map from original_id to optional content_hash.
146 fn get_hashes_for_portal(
147 &self,
148 portal_url: &str,
149 ) -> impl Future<Output = Result<HashMap<String, Option<String>>, AppError>> + Send;
150
151 /// Updates only the timestamp for an unchanged dataset.
152 ///
153 /// Used when content hash matches but we want to track "last seen" time.
154 ///
155 /// # Arguments
156 ///
157 /// * `portal_url` - The source portal URL
158 /// * `original_id` - The dataset's original ID from the portal
159 fn update_timestamp_only(
160 &self,
161 portal_url: &str,
162 original_id: &str,
163 ) -> impl Future<Output = Result<(), AppError>> + Send;
164
165 /// Batch updates timestamps for multiple unchanged datasets.
166 ///
167 /// More efficient than calling `update_timestamp_only` for each dataset.
168 ///
169 /// # Arguments
170 ///
171 /// * `portal_url` - The source portal URL
172 /// * `original_ids` - Slice of dataset original IDs to update
173 ///
174 /// # Returns
175 ///
176 /// The number of rows actually updated.
177 fn batch_update_timestamps(
178 &self,
179 portal_url: &str,
180 original_ids: &[String],
181 ) -> impl Future<Output = Result<u64, AppError>> + Send;
182
183 /// Inserts or updates a dataset.
184 ///
185 /// # Arguments
186 ///
187 /// * `dataset` - The dataset to upsert
188 ///
189 /// # Returns
190 ///
191 /// The UUID of the affected row.
192 fn upsert(&self, dataset: &NewDataset) -> impl Future<Output = Result<Uuid, AppError>> + Send;
193
194 /// Performs vector similarity search.
195 ///
196 /// # Arguments
197 ///
198 /// * `query_vector` - The embedding vector to search for
199 /// * `limit` - Maximum number of results
200 ///
201 /// # Returns
202 ///
203 /// Datasets ranked by similarity score (highest first).
204 fn search(
205 &self,
206 query_vector: Vector,
207 limit: usize,
208 ) -> impl Future<Output = Result<Vec<SearchResult>, AppError>> + Send;
209
210 /// Lists datasets as a stream with optional filtering.
211 ///
212 /// This method returns a stream of datasets for memory-efficient
213 /// processing of large result sets. Unlike batch methods, it streams
214 /// results directly from the database without loading everything into memory.
215 ///
216 /// # Arguments
217 ///
218 /// * `portal_filter` - Optional portal URL to filter by
219 /// * `limit` - Optional maximum number of records
220 fn list_stream<'a>(
221 &'a self,
222 portal_filter: Option<&'a str>,
223 limit: Option<usize>,
224 ) -> BoxStream<'a, Result<Dataset, AppError>>;
225
226 /// Retrieves the last successful sync timestamp for a portal.
227 ///
228 /// Used for incremental harvesting to determine which datasets
229 /// have been modified since the last sync.
230 ///
231 /// # Arguments
232 ///
233 /// * `portal_url` - The source portal URL
234 ///
235 /// # Returns
236 ///
237 /// The timestamp of the last successful sync, or None if never synced.
238 fn get_last_sync_time(
239 &self,
240 portal_url: &str,
241 ) -> impl Future<Output = Result<Option<DateTime<Utc>>, AppError>> + Send;
242
243 /// Records a sync status for a portal.
244 ///
245 /// Called after a harvest operation to update the sync status.
246 /// The `sync_status` parameter indicates the outcome: "completed" or "cancelled".
247 ///
248 /// # Arguments
249 ///
250 /// * `portal_url` - The source portal URL
251 /// * `sync_time` - The timestamp of this sync
252 /// * `sync_mode` - Either "full" or "incremental"
253 /// * `sync_status` - The outcome: "completed" or "cancelled"
254 /// * `datasets_synced` - Number of datasets processed
255 fn record_sync_status(
256 &self,
257 portal_url: &str,
258 sync_time: DateTime<Utc>,
259 sync_mode: &str,
260 sync_status: &str,
261 datasets_synced: i32,
262 ) -> impl Future<Output = Result<(), AppError>> + Send;
263
264 /// Checks database connectivity.
265 ///
266 /// Performs a simple query to verify the database is reachable and responsive.
267 /// Used by health check endpoints.
268 fn health_check(&self) -> impl Future<Output = Result<(), AppError>> + Send;
269}