1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
//! FoldDB Core - Main database coordinator
//!
//! This module contains the main FoldDB struct that manages schemas, permissions, and data storage.
// Standard library imports
// Standard library imports
use std::path::Path;
use std::sync::Arc;
// External crate imports
use log::{debug, info};
// Internal crate imports
use crate::db_operations::{DbOperations, IndexResult};
use crate::logging::features::{log_feature, LogFeature};
use crate::schema::{SchemaCore, SchemaError};
use crate::storage::StorageError;
use crate::transform::manager::TransformManager;
// Infrastructure components that are used internally
// init_transform_manager removed
// SystemInitializationRequest removed
use super::infrastructure::{AsyncMessageBus, EventMonitor};
use super::mutation_manager::MutationManager;
use super::orchestration::index_status::IndexStatusTracker;
use super::orchestration::TransformOrchestrator;
use super::query::QueryExecutor;
use crate::progress::ProgressStore as JobStore;
/// The main database coordinator that manages schemas, permissions, and data storage.
pub struct FoldDB {
pub(crate) schema_manager: Arc<SchemaCore>,
pub(crate) transform_manager: Arc<TransformManager>,
/// Shared database operations with storage abstraction
pub(crate) db_ops: Arc<DbOperations>,
/// Query executor for handling all query operations
pub(crate) query_executor: QueryExecutor,
/// Message bus for event-driven communication
pub(crate) message_bus: Arc<AsyncMessageBus>,
/// Event monitor for system-wide observability
pub(crate) event_monitor: Arc<EventMonitor>,
/// Transform orchestrator for managing transform execution
/// Optional for backends that don't support orchestrator_tree (e.g., DynamoDB)
pub(crate) transform_orchestrator: Option<Arc<TransformOrchestrator>>,
/// Mutation manager for handling all mutation operations
pub(crate) mutation_manager: MutationManager,
}
impl FoldDB {
/// Retrieves or generates and persists the node identifier.
pub async fn get_node_id(&self) -> Result<String, crate::storage::StorageError> {
self.db_ops
.get_node_id()
.await
.map_err(|e| crate::storage::StorageError::BackendError(e.to_string()))
}
/// Retrieves the list of permitted schemas for the given node.
pub async fn get_schema_permissions(&self, node_id: &str) -> Vec<String> {
self.db_ops
.get_schema_permissions(node_id)
.await
.unwrap_or_default()
}
/// Sets the permitted schemas for the given node.
pub async fn set_schema_permissions(
&self,
node_id: &str,
schemas: &[String],
) -> sled::Result<()> {
self.db_ops
.set_schema_permissions(node_id, schemas)
.await
.map_err(|e| sled::Error::Unsupported(e.to_string()))
}
/// Properly close and flush the database to release all file locks
pub fn close(&self) -> Result<(), sled::Error> {
log_feature!(
LogFeature::Database,
info,
"Closing FoldDB and flushing all data to disk"
);
// Flush the main database
// Storage abstraction auto-flushes for cloud backends, manual flush for Sled
// NOTE: Flush is critical for Sled, but calling it during Drop in an async context
// can cause "runtime within runtime" panics. For now, we skip explicit flush on close
// and rely on Sled's own drop/flush mechanisms.
// In production, call flush() explicitly before dropping FoldDB.
log_feature!(
LogFeature::Database,
debug,
"FoldDB close() - relying on storage backend's own flush mechanisms"
);
Ok(())
}
/// Creates a new FoldDB instance with the specified storage path.
/// All initializations happen here. This is the main entry point for the FoldDB system.
/// Do not initialize anywhere else.
///
/// Creates a new FoldDB instance with the specified storage path.
/// All initializations happen here. This is the main entry point for the FoldDB system.
/// Do not initialize anywhere else.
///
/// Now fully async to support DbOperations with storage abstraction!
pub async fn new(path: &str) -> Result<Self, StorageError> {
let db = match sled::open(path) {
Ok(db) => db,
Err(e) => {
if e.to_string().contains("No such file or directory") {
sled::open(path)
.map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))?
} else {
return Err(StorageError::IoError(std::io::Error::other(e.to_string())));
}
}
};
Self::initialize_from_db(db, path).await
}
/// Creates a new FoldDB instance with fully initialized components.
///
/// This is the most flexible constructor, allowing the injection of
/// specific implementations for storage, progress tracking, etc.
pub async fn new_with_components(
db_ops: Arc<DbOperations>,
db_path: &str,
job_store: Option<Arc<dyn JobStore>>,
) -> Result<Self, StorageError> {
Self::initialize_from_db_ops(db_ops, db_path, job_store).await
}
/// Common initialization logic shared by both new() and new_with_s3()
/// This method initializes all FoldDB components from an already-opened sled database
///
/// Fully async - uses DbOperations with storage abstraction layer!
async fn initialize_from_db(db: sled::Db, db_path: &str) -> Result<Self, StorageError> {
log_feature!(
LogFeature::Database,
info,
"🔄 Using DbOperations with storage abstraction layer (Sled backend)"
);
// Use the new async storage abstraction!
let db_ops = Arc::new(DbOperations::from_sled(db.clone()).await?);
log_feature!(
LogFeature::Database,
info,
"✅ Storage abstraction active - using {} backend",
"Sled"
);
// For local Sled backend, we use in-memory job store if not provided (though here we hardcode None/Default)
// Actually, let's create an in-memory job store for consistency
let job_store = Arc::new(crate::progress::InMemoryProgressStore::new());
Self::initialize_from_db_ops(db_ops, db_path, Some(job_store)).await
}
/// Common initialization logic that creates all FoldDB components from DbOperations
///
/// This is used by both initialize_from_db (Sled) and new_with_db_ops (custom backends)
async fn initialize_from_db_ops(
db_ops: Arc<DbOperations>,
_db_path: &str,
job_store: Option<Arc<dyn JobStore>>,
) -> Result<Self, StorageError> {
// Initialize message bus
let message_bus = Arc::new(AsyncMessageBus::new());
// Initialize components via event-driven system initialization
// SystemInitializationRequest removed - dead code
// Create managers using direct initialization
let transform_manager = Arc::new(
TransformManager::new(Arc::clone(&db_ops), Arc::clone(&message_bus))
.await
.map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))?,
);
let schema_manager = Arc::new(
SchemaCore::new(Arc::clone(&db_ops), Arc::clone(&message_bus))
.await
.map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))?,
);
// Create and start EventMonitor for system-wide observability
let event_monitor = Arc::new(
EventMonitor::new(
Arc::clone(&message_bus),
Arc::clone(&transform_manager),
job_store.clone()
).await,
);
info!("Started EventMonitor for system-wide event tracking");
// Create QueryExecutor for handling all query operations
let query_executor = QueryExecutor::new(Arc::clone(&db_ops), Arc::clone(&schema_manager));
info!("Created QueryExecutor for query operations");
// Create and start BackfillManager (Event-Driven Backfill Tracking)
use super::orchestration::backfill_manager::BackfillManager;
let backfill_tracker = event_monitor.get_backfill_tracker();
let backfill_manager = BackfillManager::new(backfill_tracker);
backfill_manager
.start_event_listener(Arc::clone(&message_bus))
.await;
info!("Started BackfillManager for event-driven backfill tracking");
// Create TransformOrchestrator for managing transform execution
// Now supports both Sled and DynamoDB backends
let transform_orchestrator = if let Some(orchestrator_tree) =
db_ops.orchestrator_tree.clone()
{
// Sled backend - use sync version
let orchestrator = Arc::new(TransformOrchestrator::new(
Arc::clone(&transform_manager),
orchestrator_tree,
Arc::clone(&message_bus),
Arc::clone(&db_ops),
));
// Start the event listener to drive transforms
orchestrator
.start_event_listener(Arc::clone(&message_bus))
.await;
info!("Created and started TransformOrchestrator (Sled backend)");
Some(orchestrator)
} else {
// DynamoDB or other backends - use async version with orchestrator_store
let orchestrator_store = db_ops.orchestrator_store().inner().clone();
match TransformOrchestrator::new_with_store(
Arc::clone(&transform_manager),
orchestrator_store,
Arc::clone(&message_bus),
Arc::clone(&db_ops),
)
.await
{
Ok(orchestrator) => {
// Start the event listener to drive transforms
orchestrator
.start_event_listener(Arc::clone(&message_bus))
.await;
info!("Created and started TransformOrchestrator (KvStore backend)");
Some(Arc::new(orchestrator))
}
Err(e) => {
log_feature!(
LogFeature::Database,
warn,
"⚠️ Failed to create TransformOrchestrator: {}. Transforms will have limited functionality.",
e
);
None
}
}
};
// Create shared IndexStatusTracker for tracking indexing progress
// This is shared between MutationManager (read status) and IndexOrchestrator (write status)
let index_status_tracker = IndexStatusTracker::new(job_store.clone());
// Create and start IndexOrchestrator for event-driven native indexing
use super::orchestration::index_orchestrator::IndexOrchestrator;
let index_orchestrator = Arc::new(IndexOrchestrator::new(
Arc::clone(&db_ops),
Some(index_status_tracker.clone()),
));
index_orchestrator
.start_event_listener(Arc::clone(&message_bus))
.await;
info!("Started IndexOrchestrator for event-driven native indexing");
// Create MutationManager for handling all mutation operations
let mutation_manager = MutationManager::new(
Arc::clone(&db_ops),
Arc::clone(&schema_manager),
Arc::clone(&message_bus),
Some(index_status_tracker.clone()),
);
info!("Created MutationManager for mutation operations");
// Start the MutationManager event listener
let _ = mutation_manager.start_event_listener().await;
info!("Started MutationManager event listener");
// AtomManager operates via direct method calls, not event consumption.
// Event-driven components:
// - EventMonitor: System observability and statistics
// - TransformOrchestrator: Automatic transform triggering based on field changes
// - IndexEventHandler: Background indexing for improved mutation performance
// - MutationManager: handles MutationRequest events
Ok(Self {
schema_manager,
transform_manager,
db_ops,
query_executor,
message_bus,
event_monitor,
transform_orchestrator,
mutation_manager,
})
}
/// Flushes local storage to ensure all data is persisted
pub async fn flush(&self) -> Result<(), StorageError> {
self.db_ops
.flush()
.await
.map_err(|e| StorageError::IoError(std::io::Error::other(e.to_string())))
}
// ========== INDEXING STATUS API ==========
/// Get the current indexing status
pub async fn get_indexing_status(&self) -> super::orchestration::IndexingStatus {
self.mutation_manager.get_indexing_status().await
}
/// Check if indexing is currently in progress
pub async fn is_indexing(&self) -> bool {
self.mutation_manager.is_indexing().await
}
// ========== CONSOLIDATED SCHEMA API - DELEGATES TO SCHEMA_CORE ==========
/// Load schema from JSON string (creates Available schema)
pub async fn load_schema_from_json(&mut self, json_str: &str) -> Result<(), SchemaError> {
// Delegate to SchemaCore implementation
self.schema_manager.load_schema_from_json(json_str).await
}
/// Load schema from file (creates Available schema)
pub async fn load_schema_from_file<P: AsRef<Path>>(
&mut self,
path: P,
) -> Result<(), SchemaError> {
// Delegate to SchemaCore implementation
self.schema_manager.load_schema_from_file(path).await
}
/// Provides access to the underlying database operations
pub fn get_db_ops(&self) -> Arc<DbOperations> {
Arc::clone(&self.db_ops)
}
/// Get current event statistics from the event monitor
pub fn get_event_statistics(&self) -> super::infrastructure::event_monitor::EventStatistics {
self.event_monitor.get_statistics()
}
/// Get the backfill tracker
pub fn get_backfill_tracker(
&self,
) -> Arc<super::infrastructure::backfill_tracker::BackfillTracker> {
self.event_monitor.get_backfill_tracker()
}
/// Get all backfill information
pub fn get_all_backfills(&self) -> Vec<super::infrastructure::backfill_tracker::BackfillInfo> {
self.event_monitor.get_all_backfills()
}
/// Get active (in-progress) backfills
pub fn get_active_backfills(
&self,
) -> Vec<super::infrastructure::backfill_tracker::BackfillInfo> {
self.event_monitor.get_active_backfills()
}
/// Get specific backfill info
pub fn get_backfill(
&self,
transform_id: &str,
) -> Option<super::infrastructure::backfill_tracker::BackfillInfo> {
self.event_monitor.get_backfill(transform_id)
}
/// Get the message bus for publishing events (for testing)
pub fn message_bus(&self) -> Arc<AsyncMessageBus> {
Arc::clone(&self.message_bus)
}
/// Get the transform manager for testing transform functionality
pub fn transform_manager(&self) -> Arc<TransformManager> {
Arc::clone(&self.transform_manager)
}
/// Get the schema manager for testing schema functionality
pub fn schema_manager(&self) -> Arc<SchemaCore> {
Arc::clone(&self.schema_manager)
}
/// Search the native word index for a specific term
pub fn native_word_search(&self, term: &str) -> Result<Vec<IndexResult>, SchemaError> {
self.db_ops
.native_index_manager()
.ok_or_else(|| {
SchemaError::InvalidData("Native index manager not available".to_string())
})?
.search_word(term)
}
/// Search native index across all classification types and aggregate results
/// This now includes field name matches via search_word
pub async fn native_search_all_classifications(
&self,
term: &str,
) -> Result<Vec<IndexResult>, SchemaError> {
debug!(
"FoldDB: native_search_all_classifications called for term: '{}'",
term
);
// Delegate to NativeIndexManager which has the full implementation
let manager = self.db_ops.native_index_manager().ok_or_else(|| {
SchemaError::InvalidData("Native index manager not available".to_string())
})?;
// Use async version if store is available (DynamoDB), otherwise use sync (Sled)
if manager.is_async() {
// DynamoDB backend - use async
manager.search_all_classifications_async(term).await
} else {
// Sled backend - use sync
manager.search_all_classifications(term)
}
}
/// Get the transform orchestrator for managing transform execution
/// Returns None if orchestrator is not available (e.g., with DynamoDB backend)
pub fn transform_orchestrator(&self) -> Option<Arc<TransformOrchestrator>> {
self.transform_orchestrator.as_ref().map(Arc::clone)
}
/// Get the mutation manager for testing mutation functionality
pub fn mutation_manager(&self) -> &MutationManager {
&self.mutation_manager
}
/// Get the mutable mutation manager for testing mutation functionality
pub fn mutation_manager_mut(&mut self) -> &mut MutationManager {
&mut self.mutation_manager
}
}