moltendb_core/engine/mod.rs
1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13// state — the actual document data: collection → (key → JSON value)
14// storage — the persistence layer (disk, encrypted, or OPFS)
15// tx — broadcast channel for real-time WebSocket notifications
16// indexes — field indexes for fast WHERE queries
17// query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20// open() — native (server) build, opens a disk file
21// open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types; // LogEntry, DbError
27mod indexing; // index_doc, unindex_doc, track_query, create_index
28mod storage; // StorageBackend trait + concrete implementations
29mod config; // DbConfig struct
30#[cfg(feature = "schema")]
31mod schema; // JSON Schema validation
32mod operations; // get, get_all, insert_batch, update, delete, etc.
33
34// Re-export LogEntry so it can be used by tests and other crates.
35pub use types::{DbError, LogEntry};
36// Re-export DbConfig
37pub use config::DbConfig;
38// Re-export the StorageBackend trait so callers can use it without knowing
39// the internal module structure.
40pub use storage::{StorageBackend, EncryptedStorage};
41#[cfg(not(target_arch = "wasm32"))]
42pub use storage::{AsyncDiskStorage, SyncDiskStorage};
43
44// DashMap = concurrent hash map. DashSet = concurrent hash set.
45use dashmap::{DashMap, DashSet};
46use tracing::{info};
47// Value = dynamically-typed JSON value.
48use serde_json::Value;
49// Standard HashMap — used for return values from get operations.
50use std::collections::HashMap;
51// Arc = thread-safe reference-counted pointer.
52// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
53// the same underlying data.
54use std::ops::ControlFlow;
55use std::sync::Arc;
56// Tokio's broadcast channel: one sender, many receivers.
57// Used to push real-time change notifications to WebSocket subscribers.
58use tokio::sync::broadcast;
59
60/// The central database handle. Cheap to clone — all clones share the same state.
61///
62/// This struct is the public API of the engine. All database operations go
63/// through methods on this struct, which delegate to the operations module.
64#[derive(Clone)]
65pub struct Db {
66 /// The main document store.
67 /// Outer map: collection name (e.g. "users") → inner map.
68 /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
69 /// DashMap allows concurrent reads and writes from multiple threads.
70 state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
71
72 /// The storage backend — handles persistence to disk or OPFS.
73 /// `pub` so handlers can access it directly if needed (e.g. for compaction).
74 /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
75 pub storage: Arc<dyn StorageBackend>,
76
77 /// Broadcast channel sender for real-time change notifications.
78 /// When a document is inserted, updated, or deleted, a JSON event is sent
79 /// on this channel. WebSocket handlers subscribe to receive these events.
80 /// `pub` so the WebSocket handler in main.rs can call subscribe().
81 pub tx: broadcast::Sender<String>,
82
83 /// The index store.
84 /// Key format: "collection:field" (e.g. "users:role").
85 /// Value: field_value → set of document keys with that value.
86 /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
87 /// `pub` so handlers.rs can check for index existence directly.
88 pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
89
90 /// Query frequency counter for auto-indexing.
91 /// Key: "collection:field". Value: number of times queried.
92 /// When a field reaches 3 queries, an index is auto-created.
93 pub query_heatmap: Arc<DashMap<String, u32>>,
94
95 /// The maximum number of documents per collection to keep in RAM (Hot).
96 /// If a collection exceeds this, older documents are paged out to disk (Cold).
97 /// Default is 50,000.
98 pub hot_threshold: usize,
99
100 /// Max requests per window.
101 pub rate_limit_requests: u32,
102
103 /// Window size in seconds.
104 pub rate_limit_window: u64,
105
106 /// Maximum request body size in bytes.
107 pub max_body_size: usize,
108
109 /// Maximum keys allowed per request.
110 pub max_keys_per_request: usize,
111
112 /// Registered JSON schemas per collection.
113 /// Key: collection name → Value: (Original JSON, Compiled Validator).
114 #[cfg(feature = "schema")]
115 pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
116
117 /// Optional shell command to execute after a successful backup.
118 /// Supports the {SNAPSHOT_PATH} placeholder.
119 pub post_backup_script: Option<String>,
120
121 /// Whether tiered (hot+cold) storage mode is active.
122 pub tiered_mode: bool,
123
124 /// Timestamp of when this Db instance was opened, used for uptime calculation.
125 pub started_at: std::time::Instant,
126}
127
128impl Db {
129 /// Open (or create) a database at the given file path.
130 /// Only available on native (non-WASM) builds.
131 ///
132 /// `sync_mode` — if true, use SyncDiskStorage (flush on every write).
133 /// if false, use AsyncDiskStorage (flush every 50ms).
134 /// Ignored when `tiered_mode` is true.
135 /// `tiered_mode` — if true, use TieredStorage (hot + cold two-tier backend).
136 /// Hot writes go to the active log; cold data is archived and
137 /// read via mmap on startup. Best for large datasets (100k+ docs).
138 /// Enable with STORAGE_MODE=tiered environment variable.
139 /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
140 /// if None, data is stored in plaintext (not recommended).
141 #[cfg(not(target_arch = "wasm32"))]
142 pub fn open(config: DbConfig) -> Result<Self, DbError> {
143 let path = &config.path;
144 let sync_mode = config.sync_mode;
145 let tiered_mode = config.tiered_mode;
146 let hot_threshold = config.hot_threshold;
147 let rate_limit_requests = config.rate_limit_requests;
148 let rate_limit_window = config.rate_limit_window;
149 let max_body_size = config.max_body_size;
150 let max_keys_per_request = config.max_keys_per_request;
151 let encryption_key = config.encryption_key;
152 let post_backup_script = config.post_backup_script;
153
154 // Create the shared in-memory state containers.
155 let state = Arc::new(DashMap::new());
156 // Create the broadcast channel with a buffer of 100 messages.
157 // If the buffer fills up (no subscribers reading), old messages are dropped.
158 let (tx, _rx) = broadcast::channel(100);
159 let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
160 Arc::new(Default::default());
161 let query_heatmap = Arc::new(Default::default());
162 #[cfg(feature = "schema")]
163 let schemas = Arc::new(DashMap::new());
164
165 // Ensure the parent directory exists.
166 if let Some(parent) = std::path::Path::new(path).parent() {
167 std::fs::create_dir_all(parent)?;
168 }
169
170 // Choose the base storage backend based on the configured mode.
171 //
172 // tiered_mode = true → TieredStorage: hot log (async writes) + cold log
173 // (mmap reads). Best for large datasets. The cold log
174 // accumulates promoted hot data and is paged by the OS.
175 //
176 // sync_mode = true → SyncDiskStorage: every write is flushed to disk
177 // immediately. Zero data loss, lower throughput.
178 //
179 // default → AsyncDiskStorage: writes buffered in memory, flushed
180 // every 50ms. Highest throughput, up to 50ms data loss.
181 let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
182 Arc::new(storage::TieredStorage::new(path)?)
183 } else if sync_mode {
184 Arc::new(storage::SyncDiskStorage::new(path)?)
185 } else {
186 Arc::new(storage::AsyncDiskStorage::new(path)?)
187 };
188
189 // Optionally wrap the base storage in EncryptedStorage.
190 // EncryptedStorage is transparent — it encrypts on write and decrypts
191 // on read, so the rest of the engine doesn't know encryption is happening.
192 let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
193 Arc::new(storage::EncryptedStorage::new(base_storage, &key))
194 } else {
195 base_storage
196 };
197
198 // Replay the log (or snapshot + delta) into the in-memory state.
199 // After this call, `state` and `indexes` reflect the persisted data.
200 storage::stream_into_state(
201 &*storage,
202 &state,
203 &indexes,
204 #[cfg(feature = "schema")] &schemas,
205 )?;
206
207 Ok(Self {
208 state,
209 storage,
210 tx,
211 indexes,
212 query_heatmap,
213 hot_threshold,
214 rate_limit_requests,
215 rate_limit_window,
216 max_body_size,
217 max_keys_per_request,
218 #[cfg(feature = "schema")]
219 schemas,
220 post_backup_script,
221 tiered_mode,
222 started_at: std::time::Instant::now(),
223 })
224 }
225
226 /// Open (or create) a database in the browser using OPFS.
227 /// Only available on WASM builds. Async because OPFS APIs return Promises.
228 ///
229 /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
230 #[cfg(target_arch = "wasm32")]
231 pub async fn open_wasm(config: DbConfig) -> Result<Self, DbError> {
232 let db_name = &config.path;
233 let hot_threshold = config.hot_threshold;
234 let rate_limit_requests = config.rate_limit_requests;
235 let rate_limit_window = config.rate_limit_window;
236 let max_body_size = config.max_body_size;
237 let max_keys_per_request = config.max_keys_per_request;
238 let encryption_key = config.encryption_key;
239 let sync_mode = config.sync_mode;
240 let post_backup_script = config.post_backup_script;
241
242 let state = Arc::new(DashMap::new());
243 let (tx, _rx) = broadcast::channel(100);
244 let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
245 Arc::new(Default::default());
246 let query_heatmap = Arc::new(Default::default());
247 #[cfg(feature = "schema")]
248 let schemas = Arc::new(DashMap::new());
249
250 // Open the OPFS file. This is async because the browser's OPFS API
251 // uses Promises which we must await.
252 let mut storage: Arc<dyn StorageBackend> =
253 Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
254
255 // Apply encryption wrapper if a key is provided.
256 if let Some(key) = encryption_key {
257 storage = Arc::new(storage::EncryptedStorage::new(storage, &key));
258 }
259
260 // Replay the log into the in-memory state.
261 storage::stream_into_state(
262 &*storage,
263 &state,
264 &indexes,
265 #[cfg(feature = "schema")] &schemas,
266 )?;
267
268 Ok(Self {
269 state,
270 storage,
271 tx,
272 indexes,
273 query_heatmap,
274 hot_threshold,
275 rate_limit_requests,
276 rate_limit_window,
277 max_body_size,
278 max_keys_per_request,
279 #[cfg(feature = "schema")]
280 schemas,
281 post_backup_script,
282 tiered_mode: config.tiered_mode,
283 started_at: std::time::Instant::now(),
284 })
285 }
286
287 /// Returns the total number of hot (in-memory) keys across all collections.
288 pub fn hot_keys_count(&self) -> usize {
289 self.state.iter().map(|c| c.value().len()).sum()
290 }
291
292 /// Create a new broadcast receiver for real-time change notifications.
293 /// Each call returns an independent receiver — multiple WebSocket handlers
294 /// can each subscribe and receive all events independently.
295 pub fn subscribe(&self) -> broadcast::Receiver<String> {
296 self.tx.subscribe()
297 }
298
299 /// Retrieve a single document by key. Returns None if not found.
300 pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
301 operations::get(&self.state, &self.storage, collection, key)
302 }
303
304 /// Retrieve all documents in a collection as a HashMap.
305 pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
306 operations::get_all(&self.state, &self.storage, collection)
307 }
308
309 /// Retrieve a specific set of documents by their keys.
310 pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
311 operations::get_batch(&self.state, &self.storage, collection, keys)
312 }
313
314 /// Insert or overwrite multiple documents in one call.
315 /// Each item is a (key, value) pair. Writes are persisted to storage.
316 pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
317 operations::insert_batch(
318 &self.state,
319 &self.indexes,
320 &self.storage,
321 &self.tx,
322 #[cfg(feature = "schema")] &self.schemas,
323 collection,
324 items,
325 )?;
326
327 // Auto-evict if the collection exceeds the threshold.
328 let _ = self.evict_collection(collection, self.hot_threshold);
329 Ok(())
330 }
331
332 /// Partially update a document — merges `updates` into the existing document.
333 /// Returns true if the document was found and updated, false if not found.
334 pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
335 let updated = operations::update(
336 &self.state,
337 &self.indexes,
338 &self.storage,
339 &self.tx,
340 #[cfg(feature = "schema")] &self.schemas,
341 collection,
342 key,
343 updates,
344 )?;
345
346 if updated {
347 // Auto-evict if the collection exceeds the threshold.
348 let _ = self.evict_collection(collection, self.hot_threshold);
349 }
350 Ok(updated)
351 }
352
353 /// Delete a single document by key.
354 pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
355 operations::delete(
356 &self.state,
357 &self.indexes,
358 &self.storage,
359 &self.tx,
360 collection,
361 key,
362 )
363 }
364
365 /// Delete multiple documents by key in one call.
366 pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
367 operations::delete_batch(
368 &self.state,
369 &self.indexes,
370 &self.storage,
371 &self.tx,
372 collection,
373 keys,
374 )
375 }
376
377 /// Drop an entire collection — removes all documents and its indexes.
378 pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
379 operations::delete_collection(
380 &self.state,
381 &self.indexes,
382 &self.storage,
383 &self.tx,
384 collection,
385 )
386 }
387
388 /// Track that `field` was queried in `collection` and auto-create an index
389 /// if this field has been queried 3 or more times.
390 /// Errors are silently ignored — auto-indexing is best-effort.
391 pub fn track_query(&self, collection: &str, field: &str) {
392 // The `let _ =` discards the Result — a failed auto-index is not fatal.
393 let _ = indexing::track_query(
394 &self.indexes,
395 &self.query_heatmap,
396 collection,
397 field,
398 &self.storage,
399 &self.state,
400 );
401 }
402
403 /// Register a JSON schema for a collection.
404 /// All subsequent writes to this collection must conform to this schema.
405 #[cfg(feature = "schema")]
406 pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
407 schema::set_schema(
408 &self.schemas,
409 &self.storage,
410 &self.tx,
411 collection,
412 schema
413 )
414 }
415
416 /// Compact the log file — rewrite it to contain only the current state.
417 ///
418 /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
419 /// and writes a binary snapshot for fast next startup.
420 ///
421 /// The compacted log contains:
422 /// - One INSERT entry per live document (current value only).
423 /// - One INDEX entry per registered index (index data is rebuilt on replay).
424 pub fn compact(&self) -> Result<(), DbError> {
425 info!("🔨 Starting Log Compaction...");
426
427 // Build the minimal set of entries representing the current state.
428 let mut entries = Vec::new();
429
430 // One INSERT per live document across all collections.
431 for col_ref in self.state.iter() {
432 let col_name = col_ref.key();
433 for item_ref in col_ref.value().iter() {
434 // To compact, we need the full Value. If it's Cold, we fetch it from storage.
435 let entry = match item_ref.value() {
436 crate::engine::types::DocumentState::Hot(v) => {
437 types::LogEntry::new(
438 "INSERT".to_string(),
439 col_name.clone(),
440 item_ref.key().clone(),
441 v.clone(),
442 )
443 }
444 crate::engine::types::DocumentState::Cold(ptr) => {
445 let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
446 serde_json::from_slice(&bytes)?
447 }
448 };
449 entries.push(entry);
450 }
451 }
452
453 // One SCHEMA entry per collection.
454 #[cfg(feature = "schema")]
455 for schema_ref in self.schemas.iter() {
456 let col_name = schema_ref.key();
457 let (schema_json, _) = &**schema_ref.value();
458 entries.push(types::LogEntry::new(
459 "SCHEMA".to_string(),
460 col_name.clone(),
461 "".to_string(),
462 schema_json.clone(),
463 ));
464 }
465
466 // One INDEX entry per registered index.
467 // The index name format is "collection:field" — we split it to get both parts.
468 for index_ref in self.indexes.iter() {
469 let parts: Vec<&str> = index_ref.key().split(':').collect();
470 if parts.len() == 2 {
471 entries.push(types::LogEntry::new(
472 "INDEX".to_string(),
473 parts[0].to_string(),
474 parts[1].to_string(), // field name
475 serde_json::json!(null),
476 ));
477 }
478 }
479
480 // Delegate the actual file rewrite (and snapshot write) to the storage backend.
481 self.storage.compact_with_hook(entries.clone(), self.post_backup_script.clone())?;
482
483 // After compaction the log is rewritten and all old RecordPointers are invalid.
484 // Promote every Cold entry in the in-memory state to Hot so subsequent reads
485 // don't try to seek to stale byte offsets in the now-truncated log file.
486 for entry in &entries {
487 if entry.cmd == "INSERT" {
488 if let Some(col) = self.state.get(&entry.collection) {
489 if let Some(mut doc) = col.get_mut(&entry.key) {
490 if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
491 *doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
492 }
493 }
494 }
495 }
496 }
497
498 info!("✅ Log Compaction Finished!");
499 Ok(())
500 }
501
502 /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
503 ///
504 /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
505 /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
506 pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
507 let col_len = if let Some(col) = self.state.get(collection) {
508 col.len()
509 } else {
510 return Err(DbError::CollectionNotFound);
511 };
512
513 if col_len <= limit {
514 return Ok(0);
515 }
516
517 let mut evicted_count = 0;
518 let mut offset = 0u64;
519 let to_evict = col_len - limit;
520
521 // To evict properly, we need the pointers. Since we don't store them for
522 // Hot documents, we re-scan the log to find them.
523 self.storage.stream_log_into(&mut |entry, length| {
524 if entry.collection == collection {
525 if evicted_count < to_evict {
526 if let Some(col) = self.state.get(collection) {
527 if let Some(mut doc_state) = col.get_mut(&entry.key) {
528 if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
529 *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
530 offset,
531 length,
532 });
533 evicted_count += 1;
534 }
535 }
536 }
537 }
538 }
539 offset += (length + 1) as u64;
540 ControlFlow::Continue(())
541 })?;
542
543 Ok(evicted_count)
544 }
545
546 /// Recover the database state to a specific point in time or sequence number.
547 /// Returns the recovered state as a Vec of LogEntries that can be written to a snapshot.
548 ///
549 /// This is a utility function used by the CLI for PITR.
550 #[cfg(not(target_arch = "wasm32"))]
551 pub fn recover_to(
552 storage: &dyn StorageBackend,
553 to_time: Option<u64>,
554 to_seq: Option<u64>,
555 ) -> Result<Vec<LogEntry>, DbError> {
556 let state: DashMap<String, DashMap<String, crate::engine::types::DocumentState>> = DashMap::new();
557 let indexes: DashMap<String, DashMap<String, DashSet<String>>> = DashMap::new();
558 #[cfg(feature = "schema")]
559 let schemas: DashMap<String, Arc<(serde_json::Value, jsonschema::Validator)>> = DashMap::new();
560
561 let mut offset = 0u64;
562 let mut count = 0u64;
563 let mut current_tx_entries = Vec::new();
564 let mut current_tx_id = None;
565
566 storage.stream_log_into(&mut |entry, length| {
567 // Condition 1: Check Timestamp
568 if let Some(t) = to_time {
569 if entry._t > t {
570 return ControlFlow::Break(());
571 }
572 }
573
574 // Condition 2: Check Sequence
575 if let Some(s) = to_seq {
576 if count >= s {
577 return ControlFlow::Break(());
578 }
579 }
580
581 let pointer = crate::engine::types::RecordPointer {
582 offset,
583 length,
584 };
585
586 match entry.cmd.as_str() {
587 "TX_BEGIN" => {
588 current_tx_id = Some(entry.key.clone());
589 current_tx_entries.clear();
590 }
591 "TX_COMMIT" => {
592 if current_tx_id.as_ref() == Some(&entry.key) {
593 for (e, p) in current_tx_entries.drain(..) {
594 crate::engine::storage::apply_entry(
595 &e,
596 &state,
597 &indexes,
598 #[cfg(feature = "schema")] &schemas,
599 Some(p),
600 );
601 }
602 current_tx_id = None;
603 }
604 }
605 _ => {
606 if current_tx_id.is_some() {
607 current_tx_entries.push((entry, pointer));
608 } else {
609 crate::engine::storage::apply_entry(
610 &entry,
611 &state,
612 &indexes,
613 #[cfg(feature = "schema")] &schemas,
614 Some(pointer),
615 );
616 }
617 }
618 }
619
620 count += 1;
621 offset += (length + 1) as u64;
622 ControlFlow::Continue(())
623 })?;
624
625 // Convert the recovered state into LogEntries (similar to compact logic)
626 let mut entries = Vec::new();
627 for col_ref in state.iter() {
628 let col_name = col_ref.key();
629 for item_ref in col_ref.value().iter() {
630 let entry = match item_ref.value() {
631 crate::engine::types::DocumentState::Hot(v) => {
632 LogEntry::new(
633 "INSERT".to_string(),
634 col_name.clone(),
635 item_ref.key().clone(),
636 v.clone(),
637 )
638 }
639 crate::engine::types::DocumentState::Cold(ptr) => {
640 let bytes = storage.read_at(ptr.offset, ptr.length).unwrap_or_default();
641 serde_json::from_slice(&bytes).unwrap_or_else(|_| {
642 LogEntry::new("INSERT".to_string(), col_name.clone(), item_ref.key().clone(), serde_json::Value::Null)
643 })
644 }
645 };
646 entries.push(entry);
647 }
648 }
649
650 #[cfg(feature = "schema")]
651 for schema_ref in schemas.iter() {
652 let col_name = schema_ref.key();
653 let (schema_json, _) = &**schema_ref.value();
654 entries.push(LogEntry::new(
655 "SCHEMA".to_string(),
656 col_name.clone(),
657 "".to_string(),
658 schema_json.clone(),
659 ));
660 }
661
662 for index_ref in indexes.iter() {
663 let parts: Vec<&str> = index_ref.key().split(':').collect();
664 if parts.len() == 2 {
665 entries.push(LogEntry::new(
666 "INDEX".to_string(),
667 parts[0].to_string(),
668 parts[1].to_string(),
669 serde_json::json!(null),
670 ));
671 }
672 }
673
674 Ok(entries)
675 }
676}