ceres-core 0.4.0

Core types, harvesting logic, and services for Ceres
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
//! Trait definitions for external dependencies.
//!
//! This module defines traits that abstract over external dependencies
//! (embedding providers, portal clients, data stores), enabling:
//!
//! - **Testability**: Mock implementations for unit testing
//! - **Flexibility**: Different backend implementations (e.g., different embedding APIs)
//! - **Decoupling**: Core business logic doesn't depend on specific implementations
//!
//! # Example
//!
//! ```
//! use ceres_core::traits::{EmbeddingProvider, DatasetStore};
//!
//! // Business logic uses traits, not concrete types
//! async fn search_datasets<E, S>(
//!     embedding: &E,
//!     store: &S,
//!     query: &str,
//! ) -> Result<Vec<ceres_core::SearchResult>, ceres_core::AppError>
//! where
//!     E: EmbeddingProvider,
//!     S: DatasetStore,
//! {
//!     let vector: Vec<f32> = embedding.generate(query).await?;
//!     store.search(vector, 10).await
//! }
//! ```

use std::collections::HashMap;
use std::future::Future;

use chrono::{DateTime, Utc};
use futures::stream::{self, BoxStream};
use uuid::Uuid;

use crate::config::PortalType;
use crate::{AppError, Dataset, NewDataset, SearchResult};

/// Provider for generating text embeddings.
///
/// Implementations convert text into vector representations for semantic search.
/// Different providers may produce vectors of different dimensions:
/// - Gemini text-embedding-004: 768 dimensions
/// - OpenAI text-embedding-3-small: 1536 dimensions
/// - OpenAI text-embedding-3-large: 3072 dimensions
pub trait EmbeddingProvider: Send + Sync + Clone {
    /// Returns the provider identifier for logging and configuration.
    ///
    /// # Examples
    ///
    /// - `"gemini"` for Google Gemini
    /// - `"openai"` for OpenAI
    fn name(&self) -> &'static str;

    /// Returns the embedding dimension this provider generates.
    ///
    /// This value must match the database column dimension for vector storage.
    /// Mismatched dimensions will cause insertion failures.
    fn dimension(&self) -> usize;

    /// Generates an embedding vector for the given text.
    ///
    /// # Arguments
    ///
    /// * `text` - The text to embed
    ///
    /// # Returns
    ///
    /// A vector of floating-point values representing the text embedding.
    /// The vector length must equal `self.dimension()`.
    fn generate(&self, text: &str) -> impl Future<Output = Result<Vec<f32>, AppError>> + Send;

    /// Maximum number of texts supported per batch API call.
    ///
    /// The harvest pipeline uses `min(config.embedding_batch_size, max_batch_size())`
    /// to ensure batches never exceed provider limits.
    ///
    /// # Defaults
    ///
    /// Returns `1` (single-item batches). Providers with native batch support
    /// should override to enable efficient batching.
    fn max_batch_size(&self) -> usize {
        1
    }

    /// Generates embeddings for multiple texts in a batch.
    ///
    /// The default implementation calls `generate()` sequentially.
    /// Providers with native batch API support should override for efficiency.
    ///
    /// # Arguments
    ///
    /// * `texts` - Slice of texts to embed
    ///
    /// # Returns
    ///
    /// A vector of embedding vectors, one per input text.
    fn generate_batch(
        &self,
        texts: &[String],
    ) -> impl Future<Output = Result<Vec<Vec<f32>>, AppError>> + Send {
        let texts_owned: Vec<String> = texts.to_vec();
        async move {
            let mut results = Vec::with_capacity(texts_owned.len());
            for text in &texts_owned {
                results.push(self.generate(text).await?);
            }
            Ok(results)
        }
    }
}

/// Client for accessing open data portals (CKAN, Socrata, etc.).
///
/// Implementations fetch dataset metadata from portal APIs.
pub trait PortalClient: Send + Sync + Clone {
    /// Type representing raw portal data before transformation.
    type PortalData: Send;

    /// Returns the portal type identifier (e.g., "ckan", "socrata", "dcat").
    fn portal_type(&self) -> &'static str;

    /// Returns the base URL of the portal.
    fn base_url(&self) -> &str;

    /// Lists all dataset IDs available on the portal.
    fn list_dataset_ids(&self) -> impl Future<Output = Result<Vec<String>, AppError>> + Send;

    /// Fetches detailed metadata for a specific dataset.
    ///
    /// # Arguments
    ///
    /// * `id` - The dataset identifier
    fn get_dataset(
        &self,
        id: &str,
    ) -> impl Future<Output = Result<Self::PortalData, AppError>> + Send;

    /// Converts portal-specific data into a normalized NewDataset.
    ///
    /// # Arguments
    ///
    /// * `data` - The raw portal data
    /// * `portal_url` - The portal URL for source tracking
    /// * `url_template` - Optional URL template with `{id}` and `{name}` placeholders
    /// * `language` - Preferred language for resolving multilingual fields
    fn into_new_dataset(
        data: Self::PortalData,
        portal_url: &str,
        url_template: Option<&str>,
        language: &str,
    ) -> NewDataset;

    /// Searches for datasets modified since the given timestamp.
    ///
    /// Used for incremental harvesting to fetch only recently modified datasets.
    /// Returns full dataset objects, eliminating the need for separate get_dataset calls.
    ///
    /// # Arguments
    ///
    /// * `since` - Only return datasets modified after this timestamp
    ///
    /// # Returns
    ///
    /// A vector of portal-specific dataset objects modified since the given time.
    /// Returns an error if the portal doesn't support incremental search.
    fn search_modified_since(
        &self,
        since: DateTime<Utc>,
    ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send;

    /// Fetches all datasets from the portal in bulk using paginated search.
    ///
    /// This is far more efficient than `list_dataset_ids()` + individual
    /// `get_dataset()` calls for large portals (e.g., HDX with ~40k datasets),
    /// as it avoids per-dataset HTTP requests and rate limiting.
    ///
    /// Returns full dataset objects ready for processing.
    /// The default implementation falls back to `list_dataset_ids()` + `get_dataset()`
    /// for portals that don't support bulk search.
    fn search_all_datasets(
        &self,
    ) -> impl Future<Output = Result<Vec<Self::PortalData>, AppError>> + Send {
        async {
            Err(AppError::Generic(
                "search_all_datasets not supported".to_string(),
            ))
        }
    }

    /// Streams all datasets from the portal page-by-page.
    ///
    /// Each stream item is a page of datasets (e.g., 1000 for CKAN, 100 for DCAT),
    /// matching natural pagination boundaries. This bounds peak memory to one page
    /// instead of accumulating the entire catalog.
    ///
    /// The default implementation wraps `search_all_datasets()` as a single-page
    /// stream, so existing implementors work without changes.
    fn search_all_datasets_stream(&self) -> BoxStream<'_, Result<Vec<Self::PortalData>, AppError>> {
        Box::pin(stream::once(self.search_all_datasets()))
    }

    /// Returns the total number of datasets on the portal.
    ///
    /// Used for progress reporting when streaming. Portals that support a cheap
    /// count query should override this. The default returns an error.
    fn dataset_count(&self) -> impl Future<Output = Result<usize, AppError>> + Send {
        async { Err(AppError::Generic("dataset_count not supported".to_string())) }
    }
}

/// Factory for creating portal clients.
///
/// Separate from PortalClient to avoid issues with async trait constructors.
pub trait PortalClientFactory: Send + Sync + Clone {
    /// The type of portal client this factory creates.
    type Client: PortalClient;

    /// Creates a new portal client for the given URL, portal type, and optional profile.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The portal API base URL
    /// * `portal_type` - The type of portal to create a client for
    /// * `language` - Preferred language for multilingual portals (e.g. "en", "fr")
    /// * `profile` - Optional profile for sub-dispatch (e.g. `"sparql"` for DCAT portals)
    /// * `sparql_endpoint` - Optional custom SPARQL endpoint URL (overrides `{url}/sparql`)
    fn create(
        &self,
        portal_url: &str,
        portal_type: PortalType,
        language: &str,
        profile: Option<&str>,
        sparql_endpoint: Option<&str>,
    ) -> Result<Self::Client, AppError>;
}

/// Store for dataset persistence and retrieval.
///
/// Implementations handle database operations for datasets.
pub trait DatasetStore: Send + Sync + Clone {
    /// Retrieves a dataset by its unique ID.
    ///
    /// # Arguments
    ///
    /// * `id` - The dataset's UUID
    ///
    /// # Returns
    ///
    /// The dataset if found, or None if not exists.
    fn get_by_id(&self, id: Uuid)
    -> impl Future<Output = Result<Option<Dataset>, AppError>> + Send;

    /// Retrieves content hashes for all datasets from a specific portal.
    ///
    /// Used for delta detection to determine which datasets need reprocessing.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    ///
    /// # Returns
    ///
    /// A map from original_id to optional content_hash.
    fn get_hashes_for_portal(
        &self,
        portal_url: &str,
    ) -> impl Future<Output = Result<HashMap<String, Option<String>>, AppError>> + Send;

    /// Updates only the timestamp for an unchanged dataset.
    ///
    /// Used when content hash matches but we want to track "last seen" time.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    /// * `original_id` - The dataset's original ID from the portal
    fn update_timestamp_only(
        &self,
        portal_url: &str,
        original_id: &str,
    ) -> impl Future<Output = Result<(), AppError>> + Send;

    /// Batch updates timestamps for multiple unchanged datasets.
    ///
    /// More efficient than calling `update_timestamp_only` for each dataset.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    /// * `original_ids` - Slice of dataset original IDs to update
    ///
    /// # Returns
    ///
    /// The number of rows actually updated.
    fn batch_update_timestamps(
        &self,
        portal_url: &str,
        original_ids: &[String],
    ) -> impl Future<Output = Result<u64, AppError>> + Send;

    /// Marks datasets as stale if they were not seen during the latest full sync.
    ///
    /// After a successful full sync, any dataset whose `last_updated_at` is older
    /// than `sync_start` was not present in the portal's response.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    /// * `sync_start` - Timestamp recorded at the start of the sync
    ///
    /// # Returns
    ///
    /// The number of datasets newly marked as stale.
    fn mark_stale_datasets(
        &self,
        portal_url: &str,
        sync_start: DateTime<Utc>,
    ) -> impl Future<Output = Result<u64, AppError>> + Send;

    /// Marks datasets as stale if their original_id is NOT in the given set.
    ///
    /// This is more efficient than the timestamp-based approach because it
    /// avoids updating every unchanged row just to compare timestamps later.
    /// Instead, we directly identify stale datasets by exclusion.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    /// * `seen_ids` - All original_ids seen during the current full sync
    ///
    /// # Returns
    ///
    /// The number of datasets newly marked as stale.
    fn mark_stale_by_exclusion(
        &self,
        portal_url: &str,
        seen_ids: &[String],
    ) -> impl Future<Output = Result<u64, AppError>> + Send;

    /// Inserts or updates a dataset.
    ///
    /// # Arguments
    ///
    /// * `dataset` - The dataset to upsert
    ///
    /// # Returns
    ///
    /// The UUID of the affected row.
    fn upsert(&self, dataset: &NewDataset) -> impl Future<Output = Result<Uuid, AppError>> + Send;

    /// Batch upserts multiple datasets in a single operation.
    ///
    /// Much faster than calling `upsert` in a loop because it reduces
    /// database round-trips and amortizes index update costs.
    ///
    /// # Arguments
    ///
    /// * `datasets` - Slice of datasets to upsert
    ///
    /// # Returns
    ///
    /// The UUIDs of all affected rows.
    fn batch_upsert(
        &self,
        datasets: &[NewDataset],
    ) -> impl Future<Output = Result<Vec<Uuid>, AppError>> + Send;

    /// Performs vector similarity search.
    ///
    /// # Arguments
    ///
    /// * `query_vector` - The embedding vector to search for
    /// * `limit` - Maximum number of results
    ///
    /// # Returns
    ///
    /// Datasets ranked by similarity score (highest first).
    fn search(
        &self,
        query_vector: Vec<f32>,
        limit: usize,
    ) -> impl Future<Output = Result<Vec<SearchResult>, AppError>> + Send;

    /// Lists datasets as a stream with optional filtering.
    ///
    /// This method returns a stream of datasets for memory-efficient
    /// processing of large result sets. Unlike batch methods, it streams
    /// results directly from the database without loading everything into memory.
    ///
    /// # Arguments
    ///
    /// * `portal_filter` - Optional portal URL to filter by
    /// * `limit` - Optional maximum number of records
    fn list_stream<'a>(
        &'a self,
        portal_filter: Option<&'a str>,
        limit: Option<usize>,
    ) -> BoxStream<'a, Result<Dataset, AppError>>;

    /// Retrieves the last successful sync timestamp for a portal.
    ///
    /// Used for incremental harvesting to determine which datasets
    /// have been modified since the last sync.
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    ///
    /// # Returns
    ///
    /// The timestamp of the last successful sync, or None if never synced.
    fn get_last_sync_time(
        &self,
        portal_url: &str,
    ) -> impl Future<Output = Result<Option<DateTime<Utc>>, AppError>> + Send;

    /// Records a sync status for a portal.
    ///
    /// Called after a harvest operation to update the sync status.
    /// The `sync_status` parameter indicates the outcome: "completed" or "cancelled".
    ///
    /// # Arguments
    ///
    /// * `portal_url` - The source portal URL
    /// * `sync_time` - The timestamp of this sync
    /// * `sync_mode` - Either "full" or "incremental"
    ///   TODO(design): sync_mode/sync_status should be typed enums, not &str
    /// * `sync_status` - The outcome: "completed" or "cancelled"
    /// * `datasets_synced` - Number of datasets processed
    fn record_sync_status(
        &self,
        portal_url: &str,
        sync_time: DateTime<Utc>,
        sync_mode: &str,
        sync_status: &str,
        datasets_synced: i32,
    ) -> impl Future<Output = Result<(), AppError>> + Send;

    /// Returns lowercased titles that appear across multiple portals.
    ///
    /// Used for cross-portal duplicate detection in Parquet exports.
    /// Typically returns ~21k titles (~2MB) for the full dataset.
    fn get_duplicate_titles(
        &self,
    ) -> impl Future<Output = Result<std::collections::HashSet<String>, AppError>> + Send;

    /// Lists datasets that have no embedding vector (`embedding IS NULL`).
    ///
    /// Used by [`crate::EmbeddingService`] to find datasets needing embedding generation.
    ///
    /// # Arguments
    ///
    /// * `portal_filter` - Optional portal URL to scope the query
    /// * `limit` - Maximum number of datasets to return
    fn list_pending_embeddings(
        &self,
        portal_filter: Option<&str>,
        limit: Option<usize>,
    ) -> impl Future<Output = Result<Vec<Dataset>, AppError>> + Send;

    /// Counts datasets with `embedding IS NULL`.
    ///
    /// Used for progress reporting in the embedding service.
    ///
    /// # Arguments
    ///
    /// * `portal_filter` - Optional portal URL to scope the count
    fn count_pending_embeddings(
        &self,
        portal_filter: Option<&str>,
    ) -> impl Future<Output = Result<i64, AppError>> + Send;

    /// Checks database connectivity.
    ///
    /// Performs a simple query to verify the database is reachable and responsive.
    /// Used by health check endpoints.
    fn health_check(&self) -> impl Future<Output = Result<(), AppError>> + Send;
}