1#![cfg_attr(not(feature = "std"), no_std)]
2
3use core::ptr::NonNull;
4use crate::table::Defer;
5
6pub mod types;
8pub mod config;
9pub mod table;
10pub mod index;
11pub mod transaction;
12pub mod memory;
13pub mod platform;
14pub mod monitor;
15pub mod sql;
16#[cfg(feature = "pubsub")]
17pub mod pubsub;
18#[cfg(feature = "ha")]
19pub mod ha;
20pub mod time_series;
21
22pub use types::{DataType, FieldDef, TableDef, Value, Result, RemDbError, IndexType, MAX_STRING_LEN};
24pub use table::MemoryTable;
25pub use index::{PrimaryIndex, SecondaryIndex, BTreeIndex, TTreeIndex, IndexStats, AnySecondaryIndex, PrimaryIndexItem};
26pub use transaction::{Transaction, TransactionType, TransactionManager};
27pub use monitor::{DbMetrics, DbMetricsSnapshot, HealthStatus, HealthCheckResult};
28pub use time_series::{TimeSeriesTable, TimeSeriesTableDef, TimeSeriesRecord, TimeSeriesConfig, TimeSeriesIndex, CompressionType};
29
30pub use remdb_macros::table;
32pub use remdb_macros::database;
33pub use remdb_macros::MemdbTable;
34
35extern crate alloc;
37use alloc::boxed::Box;
38use alloc::sync::Arc;
39use alloc::vec::Vec;
40use alloc::string::String;
41use alloc::string::ToString;
42
43pub struct FieldConstraint {
45 pub primary_key: bool,
47 pub not_null: bool,
49 pub unique: bool,
51 pub auto_increment: bool,
53}
54
55pub trait DdlExecutor {
57 fn create_table(
59 &mut self,
60 name: &str,
61 fields: &[(&str, DataType, Option<Value>)],
62 constraints: Option<&[FieldConstraint]>,
63 primary_key: Option<usize>
64 ) -> Result<()>;
65
66 fn create_index(
68 &mut self,
69 table_name: &str,
70 field_name: &str,
71 index_type: IndexType
72 ) -> Result<()>;
73
74 fn create_time_series_table(
76 &mut self,
77 name: &str,
78 time_field: &str,
79 value_field: &str,
80 tag_fields: &[&str],
81 config: Option<TimeSeriesConfig>
82 ) -> Result<()>;
83}
84
85pub struct RemDb {
87 pub config: &'static config::DbConfig,
89 tables: Vec<Option<MemoryTable>>,
91 time_series_tables: Vec<Option<time_series::TimeSeriesTable>>,
93 primary_indices: Vec<Option<PrimaryIndex>>,
95 secondary_indices: Vec<Option<AnySecondaryIndex>>,
97 low_power_mode: bool,
99 low_power_memory_limit: usize,
101 pub snapshot_version: u32,
103 pub metrics: monitor::DbMetrics,
105}
106
107unsafe impl Send for RemDb {}
110unsafe impl Sync for RemDb {}
111
112impl Drop for RemDb {
114 fn drop(&mut self) {
115 #[cfg(feature = "ha")]
117 if let Err(_e) = crate::ha::shutdown() {
118 }
120 }
121}
122
123impl RemDb {
124 const SNAPSHOT_MAGIC: u32 = 0x52454D44; const SNAPSHOT_VERSION: u32 = 1;
128
129 pub fn new(
131 config: &'static config::DbConfig
132 ) -> Self {
133 let low_power_memory_limit = if config.low_power_mode_supported {
135 (config.total_memory / 2).max(1024 * 1024) } else {
138 config.total_memory
139 };
140
141 let metrics = monitor::DbMetrics::new(config.total_memory);
143
144 let tables = Vec::with_capacity(config.tables.len());
146 let time_series_tables = Vec::new();
147 let primary_indices = Vec::with_capacity(config.tables.len());
148 let secondary_indices = Vec::with_capacity(config.tables.len());
149
150 RemDb {
151 config,
152 tables,
153 time_series_tables,
154 primary_indices,
155 secondary_indices,
156 low_power_mode: false, low_power_memory_limit,
158 snapshot_version: 0, metrics,
160 }
161 }
162
163 pub fn get_table(&self, table_id: usize) -> Result<&MemoryTable> {
165 if table_id >= self.tables.len() {
166 return Err(RemDbError::RecordNotFound);
167 }
168
169 match &self.tables[table_id] {
170 Some(table) => Ok(table),
171 None => Err(RemDbError::RecordNotFound),
172 }
173 }
174
175 pub fn get_table_mut(&mut self, table_id: usize) -> Result<&mut MemoryTable> {
177 if table_id >= self.tables.len() {
178 return Err(RemDbError::RecordNotFound);
179 }
180
181 match &mut self.tables[table_id] {
182 Some(table) => Ok(table),
183 None => Err(RemDbError::RecordNotFound),
184 }
185 }
186
187 pub fn get_primary_index(&self, table_id: usize) -> Result<&PrimaryIndex> {
189 if table_id >= self.primary_indices.len() {
190 return Err(RemDbError::RecordNotFound);
191 }
192
193 match &self.primary_indices[table_id] {
194 Some(index) => Ok(index),
195 None => Err(RemDbError::RecordNotFound),
196 }
197 }
198
199 pub fn get_primary_index_mut(&mut self, table_id: usize) -> Result<&mut PrimaryIndex> {
201 if table_id >= self.primary_indices.len() {
202 return Err(RemDbError::RecordNotFound);
203 }
204
205 match &mut self.primary_indices[table_id] {
206 Some(index) => Ok(index),
207 None => Err(RemDbError::RecordNotFound),
208 }
209 }
210
211 pub fn get_secondary_index(&self, table_id: usize) -> Result<&AnySecondaryIndex> {
213 if table_id >= self.secondary_indices.len() {
214 return Err(RemDbError::RecordNotFound);
215 }
216
217 match &self.secondary_indices[table_id] {
218 Some(index) => Ok(index),
219 None => Err(RemDbError::RecordNotFound),
220 }
221 }
222
223 pub fn get_secondary_index_mut(&mut self, table_id: usize) -> Result<&mut AnySecondaryIndex> {
225 if table_id >= self.secondary_indices.len() {
226 return Err(RemDbError::RecordNotFound);
227 }
228
229 match &mut self.secondary_indices[table_id] {
230 Some(index) => Ok(index),
231 None => Err(RemDbError::RecordNotFound),
232 }
233 }
234
235 pub fn is_low_power_mode(&self) -> bool {
237 self.low_power_mode
238 }
239
240 pub fn enter_low_power_mode(&mut self) -> Result<()> {
242 if !self.config.low_power_mode_supported {
244 return Err(RemDbError::UnsupportedOperation);
245 }
246
247 if self.low_power_mode {
249 return Ok(());
250 }
251
252 unsafe {
254 let current_memory = self.config.total_memory;
260 if current_memory > self.low_power_memory_limit {
261 self.optimize_memory_usage();
264 }
265
266 crate::transaction::set_low_power_mode(true);
268 }
269
270 for table in &mut self.tables.iter_mut() {
272 if let Some(table) = table {
273 table.set_low_power_mode(true, self.config.low_power_max_records);
274 }
275 }
276
277 self.low_power_mode = true;
279
280 Ok(())
281 }
282
283 fn optimize_memory_usage(&mut self) {
285 for table in &mut self.tables.iter_mut() {
291 if let Some(table) = table {
292 }
295 }
296
297 for ts_table in &mut self.time_series_tables.iter_mut() {
299 if let Some(ts_table) = ts_table {
300 }
303 }
304
305 }
311
312 pub fn exit_low_power_mode(&mut self) -> Result<()> {
314 if !self.low_power_mode {
316 return Ok(());
317 }
318
319 unsafe {
321 crate::transaction::set_low_power_mode(false);
327 }
328
329 for table in &mut self.tables.iter_mut() {
331 if let Some(table) = table {
332 table.set_low_power_mode(false, None);
333 }
334 }
335
336 self.low_power_mode = false;
338
339 Ok(())
340 }
341
342 pub unsafe fn begin_transaction(
344 &mut self,
345 tx_type: transaction::TransactionType,
346 isolation_level: transaction::IsolationLevel,
347 tx_buffer: *mut transaction::Transaction,
348 log_buffer: *mut transaction::LogItem,
349 max_log_items: usize
350 ) -> Result<NonNull<transaction::Transaction>> {
351 crate::transaction::TX_MANAGER.begin(tx_type, isolation_level, tx_buffer, log_buffer, max_log_items)
352 }
353
354 pub unsafe fn commit_transaction(&mut self) -> Result<()> {
356 crate::transaction::TX_MANAGER.commit()
357 }
358
359 pub unsafe fn rollback_transaction(&mut self) -> Result<()> {
361 crate::transaction::TX_MANAGER.rollback(self)
362 }
363
364 pub fn init(&mut self) -> Result<()> {
366 if crate::platform::PLATFORM.get().is_none() {
368 #[cfg(feature = "posix")]
370 crate::platform::init_platform(crate::platform::posix::get_posix_platform());
371
372 #[cfg(not(feature = "posix"))]
374 #[cfg(feature = "baremetal")] crate::platform::init_platform(crate::platform::baremetal::get_baremetal_platform());
375 }
376
377 #[cfg(feature = "std")]
380 {
381 use crate::transaction::LogManager;
384 use std::path::Path;
385
386 let log_dir = self.config.wal_config.log_path;
388 let wal_file_path = format!("{}/remdb.wal", log_dir);
389
390 #[cfg(feature = "std")]
392 {
393 let log_path = Path::new(log_dir);
394 if !log_path.exists() {
395 std::fs::create_dir_all(log_path).unwrap_or(());
396 }
397 }
398
399 unsafe {
400 match crate::platform::file_open(wal_file_path.as_str(), crate::platform::FileMode::ReadWrite) {
402 Ok(handle) if !handle.is_null() => {
403 let _ = crate::platform::file_close(handle);
405 let log_manager = LogManager::new(self.config)?;
406 crate::transaction::set_log_manager(log_manager);
407 },
408 _ => {
409 }
411 }
412 }
413 }
414
415 #[cfg(feature = "ha")]
417 if let Err(_e) = crate::ha::init(self.config) {
418 }
421
422 Ok(())
423 }
424
425 pub fn save_snapshot(&mut self, path: &str) -> Result<()> {
427 let handle = crate::platform::file_open(path, crate::platform::FileMode::Write)
429 .map_err(|_| RemDbError::FileIoError)?;
430
431 let _defer = Defer::new(|| {
433 let _ = crate::platform::file_close(handle);
434 });
435
436 self.snapshot_version += 1;
438
439 let magic = Self::SNAPSHOT_MAGIC.to_le_bytes();
441 let written = crate::platform::file_write(handle, magic.as_ptr(), magic.len())
442 .map_err(|_| RemDbError::FileIoError)?;
443 if written != magic.len() {
444 return Err(RemDbError::FileIoError);
445 }
446
447 let version = Self::SNAPSHOT_VERSION.to_le_bytes();
449 let written = crate::platform::file_write(handle, version.as_ptr(), version.len())
450 .map_err(|_| RemDbError::FileIoError)?;
451 if written != version.len() {
452 return Err(RemDbError::FileIoError);
453 }
454
455 let snapshot_type = 0u8;
457 let written = crate::platform::file_write(handle, &snapshot_type as *const u8, 1)
458 .map_err(|_| RemDbError::FileIoError)?;
459 if written != 1 {
460 return Err(RemDbError::FileIoError);
461 }
462
463 let version_bytes = self.snapshot_version.to_le_bytes();
465 let written = crate::platform::file_write(handle, version_bytes.as_ptr(), version_bytes.len())
466 .map_err(|_| RemDbError::FileIoError)?;
467 if written != version_bytes.len() {
468 return Err(RemDbError::FileIoError);
469 }
470
471 let table_count = self.config.tables.len() as u32;
473 let table_count_bytes = table_count.to_le_bytes();
474 let written = crate::platform::file_write(handle, table_count_bytes.as_ptr(), table_count_bytes.len())
475 .map_err(|_| RemDbError::FileIoError)?;
476 if written != table_count_bytes.len() {
477 return Err(RemDbError::FileIoError);
478 }
479
480 for table_id in 0..table_count as usize {
482 if let Some(table) = &mut self.tables[table_id] {
483 table.snapshot_version = self.snapshot_version;
485
486 let table_id_u32 = table_id as u32;
488 let table_id_bytes = table_id_u32.to_le_bytes();
489 let written = crate::platform::file_write(handle, table_id_bytes.as_ptr(), table_id_bytes.len())
490 .map_err(|_| RemDbError::FileIoError)?;
491 if written != table_id_bytes.len() {
492 return Err(RemDbError::FileIoError);
493 }
494
495 let used_count_u32 = table.record_count() as u32;
497 let used_count_bytes = used_count_u32.to_le_bytes();
498 let written = crate::platform::file_write(handle, used_count_bytes.as_ptr(), used_count_bytes.len())
499 .map_err(|_| RemDbError::FileIoError)?;
500 if written != used_count_bytes.len() {
501 return Err(RemDbError::FileIoError);
502 }
503
504 let mut record_size = 0;
506 for field in table.def.fields {
507 record_size += field.size;
508 }
509
510 for i in 0..table.def.max_records {
512 let status_ptr = unsafe { table.get_status_ptr(i) };
513 if unsafe { (*status_ptr).status } == crate::types::RecordStatus::Used {
514 let index_u32 = i as u32;
516 let index_bytes = index_u32.to_le_bytes();
517 let written = crate::platform::file_write(handle, index_bytes.as_ptr(), index_bytes.len())
518 .map_err(|_| RemDbError::FileIoError)?;
519 if written != index_bytes.len() {
520 return Err(RemDbError::FileIoError);
521 }
522
523 let record_ptr = unsafe { table.get_record_ptr(i) };
525 let written = crate::platform::file_write(handle, record_ptr, record_size)
526 .map_err(|_| RemDbError::FileIoError)?;
527 if written != record_size {
528 return Err(RemDbError::FileIoError);
529 }
530 }
531 }
532 }
533 }
534
535 Ok(())
537 }
538
539 pub fn restore_snapshot(&mut self, path: &str) -> Result<()> {
541 let handle = crate::platform::file_open(path, crate::platform::FileMode::Read)
543 .map_err(|_| RemDbError::FileIoError)?;
544
545 let _defer = Defer::new(|| {
547 let _ = crate::platform::file_close(handle);
548 });
549
550 let mut magic_bytes = [0u8; 4];
552 let read = crate::platform::file_read(handle, magic_bytes.as_mut_ptr(), magic_bytes.len())
553 .map_err(|_| RemDbError::FileIoError)?;
554 if read != magic_bytes.len() {
555 return Err(RemDbError::FileIoError);
556 }
557 let magic = u32::from_le_bytes(magic_bytes);
558 if magic != Self::SNAPSHOT_MAGIC {
559 return Err(RemDbError::SnapshotFormatError);
560 }
561
562 let mut version_bytes = [0u8; 4];
564 let read = crate::platform::file_read(handle, version_bytes.as_mut_ptr(), version_bytes.len())
565 .map_err(|_| RemDbError::FileIoError)?;
566 if read != version_bytes.len() {
567 return Err(RemDbError::FileIoError);
568 }
569 let version = u32::from_le_bytes(version_bytes);
570 if version != Self::SNAPSHOT_VERSION {
571 return Err(RemDbError::SnapshotFormatError);
572 }
573
574 let mut snapshot_type_bytes = [0u8; 1];
576 let read = crate::platform::file_read(handle, snapshot_type_bytes.as_mut_ptr(), snapshot_type_bytes.len())
577 .map_err(|_| RemDbError::FileIoError)?;
578 if read != snapshot_type_bytes.len() {
579 return Err(RemDbError::FileIoError);
580 }
581 let snapshot_type = snapshot_type_bytes[0];
582
583 let mut base_version_bytes = [0u8; 4];
585 let read = crate::platform::file_read(handle, base_version_bytes.as_mut_ptr(), base_version_bytes.len())
586 .map_err(|_| RemDbError::FileIoError)?;
587 if read != base_version_bytes.len() {
588 return Err(RemDbError::FileIoError);
589 }
590 let base_version = u32::from_le_bytes(base_version_bytes);
591
592 let mut table_count_bytes = [0u8; 4];
594 let read = crate::platform::file_read(handle, table_count_bytes.as_mut_ptr(), table_count_bytes.len())
595 .map_err(|_| RemDbError::FileIoError)?;
596 if read != table_count_bytes.len() {
597 return Err(RemDbError::FileIoError);
598 }
599 let table_count = u32::from_le_bytes(table_count_bytes) as usize;
600
601 if table_count != self.config.tables.len() {
603 return Err(RemDbError::SnapshotFormatError);
604 }
605
606 for _ in 0..table_count {
608 let mut table_id_bytes = [0u8; 4];
610 let read = crate::platform::file_read(handle, table_id_bytes.as_mut_ptr(), table_id_bytes.len())
611 .map_err(|_| RemDbError::FileIoError)?;
612 if read != table_id_bytes.len() {
613 return Err(RemDbError::FileIoError);
614 }
615 let table_id = u32::from_le_bytes(table_id_bytes) as usize;
616
617 if table_id >= self.tables.len() {
619 return Err(RemDbError::SnapshotFormatError);
620 }
621
622 let table = match &mut self.tables[table_id] {
624 Some(table) => table,
625 None => return Err(RemDbError::SnapshotFormatError),
626 };
627
628 let mut record_count_bytes = [0u8; 4];
630 let read = crate::platform::file_read(handle, record_count_bytes.as_mut_ptr(), record_count_bytes.len())
631 .map_err(|_| RemDbError::FileIoError)?;
632 if read != record_count_bytes.len() {
633 return Err(RemDbError::FileIoError);
634 }
635 let record_count = u32::from_le_bytes(record_count_bytes) as usize;
636
637 let mut record_size = 0;
639 for field in table.def.fields {
640 record_size += field.size;
641 }
642
643 if snapshot_type == 0 {
644 for i in 0..table.def.max_records {
646 let status_ptr = unsafe { table.get_status_ptr(i) };
647 let record_ptr = unsafe { table.get_record_ptr_mut(i) };
648
649 unsafe {
650 (*status_ptr).status = crate::types::RecordStatus::Free;
651 (*status_ptr).version += 1;
652 crate::platform::memset(record_ptr, 0, table.def.record_size);
653 }
654 }
655
656 unsafe {
658 table.set_record_count(0);
659 }
660 }
661
662 for _ in 0..record_count {
664 let mut index_bytes = [0u8; 4];
666 let read = crate::platform::file_read(handle, index_bytes.as_mut_ptr(), index_bytes.len())
667 .map_err(|_| RemDbError::FileIoError)?;
668 if read != index_bytes.len() {
669 return Err(RemDbError::FileIoError);
670 }
671 let i = u32::from_le_bytes(index_bytes) as usize;
672
673 if i >= table.def.max_records {
675 return Err(RemDbError::SnapshotFormatError);
676 }
677
678 let record_ptr = unsafe { table.get_record_ptr_mut(i) };
680 let read = crate::platform::file_read(handle, record_ptr, record_size)
681 .map_err(|_| RemDbError::FileIoError)?;
682 if read != record_size {
683 return Err(RemDbError::FileIoError);
684 }
685
686 let status_ptr = unsafe { table.get_status_ptr(i) };
688 let current_status = unsafe { &mut *status_ptr };
689
690 if current_status.status != crate::types::RecordStatus::Used {
691 unsafe {
693 table.inc_record_count();
694 }
695 }
696
697 current_status.status = crate::types::RecordStatus::Used;
698 current_status.version += 1;
699 }
700 }
701
702 self.snapshot_version = base_version;
704
705 Ok(())
707 }
708
709 pub fn get_metrics(&self) -> &monitor::DbMetrics {
711 &self.metrics
712 }
713
714 pub fn metrics_snapshot(&self) -> monitor::DbMetricsSnapshot {
716 let snapshot = self.metrics.snapshot();
717
718 #[cfg(feature = "pubsub")]
720 if let Some(topic_id) = crate::pubsub::get_topic_id(crate::pubsub::topics::METRICS_TOPIC) {
721 let metrics_bytes = snapshot.to_bytes();
722 let _ = crate::pubsub::publish(topic_id, &metrics_bytes);
723 }
724
725 snapshot
726 }
727
728 pub fn reset_metrics(&self) {
730 self.metrics.reset()
731 }
732
733 pub fn health_check(&self) -> monitor::HealthCheckResult {
735 let health_result = { let metrics = self.metrics.snapshot();
737
738 let memory_usage = metrics.used_memory as f64 / metrics.total_memory as f64;
740
741 let (status, details) = if memory_usage > 0.9 {
742 (monitor::HealthStatus::Unhealthy, alloc::string::String::from("内存使用率过高"))
743 } else if memory_usage > 0.7 {
744 (monitor::HealthStatus::Warning, alloc::string::String::from("内存使用率较高"))
745 } else {
746 (monitor::HealthStatus::Healthy, alloc::string::String::from("数据库运行正常"))
747 };
748
749 monitor::HealthCheckResult::new(status, metrics, details)
750 };
751
752 #[cfg(feature = "pubsub")]
754 if let Some(topic_id) = crate::pubsub::get_topic_id(crate::pubsub::topics::HEALTH_STATUS_TOPIC) {
755 let health_bytes = health_result.to_bytes();
756 let _ = crate::pubsub::publish(topic_id, &health_bytes);
757 }
758
759 health_result
760 }
761}
762
763impl DdlExecutor for RemDb {
765 fn create_table(
766 &mut self,
767 name: &str,
768 fields: &[(&str, DataType, Option<Value>)],
769 constraints: Option<&[FieldConstraint]>,
770 primary_key: Option<usize>
771 ) -> Result<()> {
772 if fields.is_empty() {
774 return Err(RemDbError::ConfigError);
775 }
776
777 if let Some(pk_index) = primary_key {
779 if pk_index >= fields.len() {
780 return Err(RemDbError::ConfigError);
781 }
782 }
783
784 let mut field_defs = Vec::new();
786 let mut offset = 0;
787 let mut record_size = 0;
788
789 for (i, (field_name, data_type, default_value)) in fields.iter().enumerate() {
790 let field_size = match data_type {
792 DataType::String => MAX_STRING_LEN,
793 _ => data_type.size(),
794 };
795
796 let field_name_static = Box::leak(field_name.to_string().into_boxed_str());
798
799 let is_primary_key = primary_key == Some(i);
801
802 let default_constraint = FieldConstraint {
804 primary_key: is_primary_key,
805 not_null: is_primary_key,
806 unique: is_primary_key,
807 auto_increment: false,
808 };
809 let constraint = constraints
810 .and_then(|c| c.get(i))
811 .unwrap_or(&default_constraint);
812
813 let is_auto_increment = constraint.auto_increment &&
814 (data_type == &DataType::Int32 || data_type == &DataType::Int64 ||
815 data_type == &DataType::UInt32 || data_type == &DataType::UInt64);
816
817 let final_not_null = is_primary_key || constraint.not_null;
819
820 let final_unique = is_primary_key || constraint.unique;
822
823 let field_def = FieldDef {
825 name: field_name_static,
826 data_type: *data_type,
827 size: field_size,
828 offset,
829 primary_key: is_primary_key, not_null: final_not_null, unique: final_unique, auto_increment: is_auto_increment, default_value: *default_value, };
835
836 field_defs.push(field_def);
837
838 offset += field_size;
840 record_size += field_size;
841 }
842
843 let table_name_static = Box::leak(name.to_string().into_boxed_str());
846 let field_defs_static = Box::leak(field_defs.into_boxed_slice());
847
848 let table_def = TableDef {
849 id: self.tables.len() as u8,
850 name: table_name_static,
851 fields: field_defs_static,
852 primary_key: primary_key.unwrap_or(0),
853 secondary_index: None,
854 secondary_index_type: IndexType::SortedArray,
855 record_size,
856 max_records: self.config.default_max_records,
857 };
858
859 let table_def_arc = alloc::sync::Arc::new(table_def);
861 let table = MemoryTable::new(table_def_arc.clone())?;
862
863 self.tables.push(Some(table));
865
866 let hash_table_size = (table_def.max_records * 2).next_power_of_two(); let index_memory_size = PrimaryIndex::calculate_memory_size(&table_def, hash_table_size, table_def.max_records);
870
871 let index_memory = crate::memory::allocator::alloc(index_memory_size)?;
873 let hash_table_start = index_memory.as_ptr() as *mut Option<NonNull<PrimaryIndexItem>>;
874 let items_start = (index_memory.as_ptr() as usize + hash_table_size * core::mem::size_of::<Option<NonNull<PrimaryIndexItem>>>()) as *mut PrimaryIndexItem;
875
876 let primary_index = unsafe {
878 PrimaryIndex::new(
879 table_def_arc.clone(),
880 hash_table_start,
881 items_start,
882 hash_table_size,
883 table_def.max_records
884 )
885 };
886 self.primary_indices.push(Some(primary_index));
887
888 self.secondary_indices.push(None);
890
891 #[cfg(feature = "pubsub")]
893 let table_creation_msg = alloc::format!("CREATE:table={},id={},fields={}",
894 table_name_static,
895 table_def.id,
896 table_def.fields.len());
897
898 #[cfg(feature = "pubsub")]
899 if let Some(topic_id) = crate::pubsub::get_topic_id(crate::pubsub::topics::TABLES_TOPIC) {
900 let _ = crate::pubsub::publish(topic_id, table_creation_msg.as_bytes());
901 }
902
903 unsafe {
905 if let Some(log_manager) = crate::transaction::TX_MANAGER.get_log_manager_mut() {
907 let mut log_data = [0u8; 512];
909 let name_bytes = table_name_static.as_bytes();
911 let name_len = core::cmp::min(name_bytes.len(), 64);
912 log_data[0] = name_len as u8;
913 log_data[1..1+name_len].copy_from_slice(&name_bytes[..name_len]);
914 log_data[65] = table_def.fields.len() as u8;
916 log_data[66] = table_def.primary_key as u8;
918
919 let mut offset = 67;
921 for (i, field) in table_def.fields.iter().enumerate() {
922 let field_name = field.name;
924 let field_name_bytes = field_name.as_bytes();
925 let field_name_len = core::cmp::min(field_name_bytes.len(), 32);
926 log_data[offset] = field_name_len as u8;
927 offset += 1;
928 log_data[offset..offset+field_name_len].copy_from_slice(&field_name_bytes[..field_name_len]);
929 offset += 32; log_data[offset] = field.data_type as u8;
933 offset += 1;
934
935 let mut constraints = 0u8;
937 if field.primary_key { constraints |= 0b0001; }
938 if field.not_null { constraints |= 0b0010; }
939 if field.unique { constraints |= 0b0100; }
940 if field.auto_increment { constraints |= 0b1000; }
941 log_data[offset] = constraints;
942 offset += 1;
943
944 let has_default = field.default_value.is_some();
946 log_data[offset] = has_default as u8;
947 offset += 1;
948
949 if let Some(default_value) = field.default_value {
951 match field.data_type {
953 crate::types::DataType::Bool => {
954 let b = default_value.bool;
955 log_data[offset] = b as u8;
956 offset += 1;
957 },
958 crate::types::DataType::Int8 => {
959 let i = default_value.i8;
960 log_data[offset] = i as u8;
961 offset += 1;
962 },
963 crate::types::DataType::UInt8 => {
964 let u = default_value.u8;
965 log_data[offset] = u;
966 offset += 1;
967 },
968 crate::types::DataType::Int16 => {
969 let i = default_value.i16;
970 log_data[offset..offset+2].copy_from_slice(&i.to_le_bytes());
971 offset += 2;
972 },
973 crate::types::DataType::UInt16 => {
974 let u = default_value.u16;
975 log_data[offset..offset+2].copy_from_slice(&u.to_le_bytes());
976 offset += 2;
977 },
978 crate::types::DataType::Int32 => {
979 let i = default_value.i32;
980 log_data[offset..offset+4].copy_from_slice(&i.to_le_bytes());
981 offset += 4;
982 },
983 crate::types::DataType::UInt32 => {
984 let u = default_value.u32;
985 log_data[offset..offset+4].copy_from_slice(&u.to_le_bytes());
986 offset += 4;
987 },
988 crate::types::DataType::Int64 => {
989 let i = default_value.i64;
990 log_data[offset..offset+8].copy_from_slice(&i.to_le_bytes());
991 offset += 8;
992 },
993 crate::types::DataType::UInt64 => {
994 let u = default_value.u64;
995 log_data[offset..offset+8].copy_from_slice(&u.to_le_bytes());
996 offset += 8;
997 },
998 crate::types::DataType::Float32 => {
999 let f = default_value.float32;
1000 log_data[offset..offset+4].copy_from_slice(&f.to_le_bytes());
1001 offset += 4;
1002 },
1003 crate::types::DataType::Float64 => {
1004 let f = default_value.float64;
1005 log_data[offset..offset+8].copy_from_slice(&f.to_le_bytes());
1006 offset += 8;
1007 },
1008 crate::types::DataType::String => {
1009 let s = default_value.string;
1010 let string_len = core::cmp::min(s.iter().position(|&c| c == 0).unwrap_or(64), 64);
1011 log_data[offset] = string_len as u8;
1012 offset += 1;
1013 log_data[offset..offset+string_len].copy_from_slice(&s[..string_len]);
1014 offset += 64; },
1016 crate::types::DataType::Timestamp | crate::types::DataType::TimestampTZ => {
1017 let t = default_value.time;
1018 log_data[offset..offset+8].copy_from_slice(&t.value.to_le_bytes());
1019 offset += 8;
1020 },
1021 crate::types::DataType::Interval => {
1022 let interval = default_value.interval;
1023 log_data[offset..offset+8].copy_from_slice(&interval.value.to_le_bytes());
1024 offset += 8;
1025 log_data[offset] = interval.precision;
1026 offset += 1;
1027 log_data[offset] = interval.flags;
1028 offset += 1;
1029 },
1030 }
1031 }
1032 }
1033
1034 let log_item = crate::transaction::LogItem {
1036 op_type: crate::transaction::LogOperation::CreateTable,
1037 table_id: table_def.id,
1038 record_id: 0,
1039 data_size: 512,
1040 old_data: [0; 512],
1041 new_data: log_data,
1042 tx_id: 0,
1043 timestamp: crate::platform::get_timestamp_us(),
1044 checksum: 0,
1045 };
1046
1047 let mut log_bytes = [0u8; core::mem::size_of::<crate::transaction::LogItem>()];
1049 core::ptr::write_unaligned(log_bytes.as_mut_ptr() as *mut crate::transaction::LogItem, log_item);
1050 let mut check_bytes = log_bytes.clone();
1051 let checksum_ptr = check_bytes.as_mut_ptr().add(core::mem::size_of::<crate::transaction::LogItem>() - 4) as *mut u32;
1052 *checksum_ptr = 0;
1053 let calculated_checksum = crate::transaction::Transaction::calculate_checksum(&check_bytes);
1054
1055 let mut final_log_item = log_item;
1056 final_log_item.checksum = calculated_checksum;
1057
1058 let _ = log_manager.write_log_item(&final_log_item);
1060 }
1061 }
1062
1063 Ok(())
1064 }
1065
1066 fn create_index(
1067 &mut self,
1068 table_name: &str,
1069 field_name: &str,
1070 index_type: IndexType
1071 ) -> Result<()> {
1072 let table_id = self.tables.iter().position(|t| {
1074 if let Some(table) = t {
1075 table.def.name == table_name
1076 } else {
1077 false
1078 }
1079 }).ok_or(RemDbError::TableNotFound)?;
1080
1081 let table = self.tables[table_id].as_ref().ok_or(RemDbError::TableNotFound)?;
1083 let field_index = table.def.fields.iter().position(|f| f.name == field_name)
1084 .ok_or(RemDbError::FieldNotFound)?;
1085
1086 if self.secondary_indices[table_id].is_some() {
1088 return Err(RemDbError::ConfigError);
1089 }
1090
1091 let mut new_fields = Vec::new();
1093 for field in table.def.fields {
1094 new_fields.push(FieldDef {
1095 name: field.name,
1096 data_type: field.data_type,
1097 size: field.size,
1098 offset: field.offset,
1099 primary_key: field.primary_key,
1100 not_null: field.not_null,
1101 unique: field.unique,
1102 auto_increment: field.auto_increment,
1103 default_value: field.default_value,
1104 });
1105 }
1106
1107 let new_def = alloc::boxed::Box::new(TableDef {
1108 id: table.def.id,
1109 name: table.def.name,
1110 fields: new_fields.leak(),
1111 primary_key: table.def.primary_key,
1112 secondary_index: Some(field_index),
1113 secondary_index_type: index_type,
1114 record_size: table.def.record_size,
1115 max_records: table.def.max_records,
1116 });
1117
1118 let max_items = table.def.max_records;
1120
1121 let index_max_nodes = match index_type {
1123 IndexType::BTree | IndexType::TTree => 100, IndexType::SortedArray => max_items, IndexType::Hash => max_items, };
1127
1128 let index_size = AnySecondaryIndex::calculate_memory_size(new_def.as_ref(), index_max_nodes);
1129 let index_memory = crate::memory::allocator::alloc(index_size)?;
1130
1131 let index = unsafe {
1133 AnySecondaryIndex::new(
1134 alloc::sync::Arc::from(new_def),
1135 index_memory.as_ptr(),
1136 index_max_nodes
1137 )?
1138 };
1139
1140 self.secondary_indices[table_id] = Some(index);
1142
1143 unsafe {
1145 if let Some(log_manager) = crate::transaction::TX_MANAGER.get_log_manager_mut() {
1147 let mut log_data = [0u8; 512];
1149 let table_name_bytes = table_name.as_bytes();
1151 let table_name_len = core::cmp::min(table_name_bytes.len(), 64);
1152 log_data[0] = table_name_len as u8;
1153 log_data[1..1+table_name_len].copy_from_slice(&table_name_bytes[..table_name_len]);
1154 let field_name_bytes = field_name.as_bytes();
1156 let field_name_len = core::cmp::min(field_name_bytes.len(), 64);
1157 log_data[65] = field_name_len as u8;
1158 log_data[66..66+field_name_len].copy_from_slice(&field_name_bytes[..field_name_len]);
1159 log_data[130] = index_type as u8;
1161
1162 let log_item = crate::transaction::LogItem {
1164 op_type: crate::transaction::LogOperation::CreateIndex,
1165 table_id: table.def.id,
1166 record_id: 0,
1167 data_size: 512,
1168 old_data: [0; 512],
1169 new_data: log_data,
1170 tx_id: 0,
1171 timestamp: crate::platform::get_timestamp_us(),
1172 checksum: 0,
1173 };
1174
1175 let mut log_bytes = [0u8; core::mem::size_of::<crate::transaction::LogItem>()];
1177 core::ptr::write_unaligned(log_bytes.as_mut_ptr() as *mut crate::transaction::LogItem, log_item);
1178 let mut check_bytes = log_bytes.clone();
1179 let checksum_ptr = check_bytes.as_mut_ptr().add(core::mem::size_of::<crate::transaction::LogItem>() - 4) as *mut u32;
1180 *checksum_ptr = 0;
1181 let calculated_checksum = crate::transaction::Transaction::calculate_checksum(&check_bytes);
1182
1183 let mut final_log_item = log_item;
1184 final_log_item.checksum = calculated_checksum;
1185
1186 let _ = log_manager.write_log_item(&final_log_item);
1188 }
1189 }
1190
1191 Ok(())
1192 }
1193
1194 fn create_time_series_table(
1195 &mut self,
1196 name: &str,
1197 time_field: &str,
1198 value_field: &str,
1199 tag_fields: &[&str],
1200 config: Option<TimeSeriesConfig>
1201 ) -> Result<()> {
1202 RemDb::create_time_series_table(self, name, time_field, value_field, tag_fields, config)
1204 }
1205}
1206
1207impl RemDb {
1208 pub fn dump_metrics(&self) -> alloc::string::String {
1210 self.metrics.snapshot().to_text()
1211 }
1212
1213 pub fn sql_query(&mut self, sql: &str) -> Result<sql::ResultSet> {
1215 let query = crate::sql::parse_sql_query(sql)
1217 .map_err(|_| RemDbError::InvalidSqlQuery)?;
1218
1219 let result_set = crate::sql::execute_query(self, &query)
1221 .map_err(|err| {
1222 match err {
1223 crate::sql::QueryExecutionError::TableNotFound => RemDbError::TableNotFound,
1224 crate::sql::QueryExecutionError::FieldNotFound => RemDbError::FieldNotFound,
1225 crate::sql::QueryExecutionError::TypeMismatch => RemDbError::TypeMismatch,
1226 crate::sql::QueryExecutionError::ConstraintsConflicts => RemDbError::DuplicateKey,
1227 crate::sql::QueryExecutionError::OutOfMemory => RemDbError::OutOfMemory,
1228 _ => {
1229 #[cfg(feature = "std")] eprintln!("SQL Execution Error: {:?}", err);
1231 RemDbError::InternalError
1232 }
1233 }
1234 })?;
1235
1236 Ok(result_set)
1237 }
1238
1239 pub fn execute_query(&mut self, table_name: &str, columns: &[&str], where_clause: Option<&str>, limit: Option<usize>) -> Result<sql::ResultSet> {
1241 let select_columns = if columns.is_empty() {
1243 "*".to_string() } else {
1245 columns.join(", ") };
1247
1248 let mut sql = alloc::format!("SELECT {} FROM {}", select_columns, table_name);
1249
1250 if let Some(where_clause) = where_clause {
1251 sql.push_str(&alloc::format!(" WHERE {}", where_clause));
1252 }
1253
1254 if let Some(limit) = limit {
1255 sql.push_str(&alloc::format!(" LIMIT {}", limit));
1256 }
1257
1258 self.sql_query(&sql)
1260 }
1261
1262 pub fn create_table(&mut self, table_name: &str, fields: &[(&str, DataType, Option<Value>)], primary_key: Option<usize>) -> Result<()> {
1264 DdlExecutor::create_table(self, table_name, fields, None, primary_key)
1266 }
1267
1268 pub fn create_time_series_table(&mut self, name: &str, time_field: &str, value_field: &str, tag_fields: &[&str], config: Option<TimeSeriesConfig>) -> Result<()> {
1270 let mut field_defs = Vec::new();
1273 let mut offset = 0;
1274 let mut record_size = 0;
1275
1276 let time_field_static = Box::leak(time_field.to_string().into_boxed_str());
1278 let time_field_size = DataType::Timestamp.size();
1279 field_defs.push(FieldDef {
1280 name: time_field_static,
1281 data_type: DataType::Timestamp,
1282 size: time_field_size,
1283 offset,
1284 primary_key: true,
1285 not_null: true,
1286 unique: false,
1287 auto_increment: false,
1288 default_value: Some(Value { time: crate::types::db_timestamp::new(0, 0, 0, 0) }),
1289 });
1290 offset += time_field_size;
1291 record_size += time_field_size;
1292
1293 let value_field_static = Box::leak(value_field.to_string().into_boxed_str());
1295 let value_field_size = DataType::Float64.size();
1296 field_defs.push(FieldDef {
1297 name: value_field_static,
1298 data_type: DataType::Float64,
1299 size: value_field_size,
1300 offset,
1301 primary_key: false,
1302 not_null: true,
1303 unique: false,
1304 auto_increment: false,
1305 default_value: Some(Value { float64: 0.0 }),
1306 });
1307 offset += value_field_size;
1308 record_size += value_field_size;
1309
1310 let mut tag_field_indices = Vec::new();
1312 for (i, tag_field) in tag_fields.iter().enumerate() {
1313 let tag_field_static = Box::leak(tag_field.to_string().into_boxed_str());
1314 let tag_field_size = MAX_STRING_LEN; field_defs.push(FieldDef {
1316 name: tag_field_static,
1317 data_type: DataType::String,
1318 size: tag_field_size,
1319 offset,
1320 primary_key: false,
1321 not_null: false,
1322 unique: false,
1323 auto_increment: false,
1324 default_value: None, });
1326 tag_field_indices.push((i + 2) as usize); offset += tag_field_size;
1328 record_size += tag_field_size;
1329 }
1330
1331 let table_name_static = Box::leak(name.to_string().into_boxed_str());
1333 let field_defs_static = Box::leak(field_defs.into_boxed_slice());
1334 let tag_field_indices_static = Box::leak(tag_field_indices.into_boxed_slice());
1335
1336 let table_def = TableDef {
1337 id: (self.tables.len() + self.time_series_tables.len()) as u8,
1338 name: table_name_static,
1339 fields: field_defs_static,
1340 primary_key: 0, secondary_index: None,
1342 secondary_index_type: IndexType::SortedArray,
1343 record_size,
1344 max_records: self.config.default_max_records,
1345 };
1346
1347 let time_series_table_def = time_series::TimeSeriesTableDef {
1349 base: table_def,
1350 time_field: 0, value_field: 1, tag_fields: tag_field_indices_static, config: config.unwrap_or(time_series::TimeSeriesConfig::DEFAULT), };
1355
1356 let index = time_series::TimeSeriesIndex::new();
1358
1359 let time_series_table = time_series::TimeSeriesTable::new(
1361 Arc::new(time_series_table_def),
1362 index
1363 )?;
1364
1365 self.time_series_tables.push(Some(time_series_table));
1367
1368 Ok(())
1369 }
1370
1371 pub fn get_time_series_table(&self, table_id: usize) -> Result<&time_series::TimeSeriesTable> {
1373 if table_id >= self.time_series_tables.len() {
1374 return Err(RemDbError::RecordNotFound);
1375 }
1376
1377 match &self.time_series_tables[table_id] {
1378 Some(table) => Ok(table),
1379 None => Err(RemDbError::RecordNotFound),
1380 }
1381 }
1382
1383 pub fn get_time_series_table_mut(&mut self, table_id: usize) -> Result<&mut time_series::TimeSeriesTable> {
1385 if table_id >= self.time_series_tables.len() {
1386 return Err(RemDbError::RecordNotFound);
1387 }
1388
1389 match &mut self.time_series_tables[table_id] {
1390 Some(table) => Ok(table),
1391 None => Err(RemDbError::RecordNotFound),
1392 }
1393 }
1394
1395 pub fn time_series_table_count(&self) -> usize {
1397 self.time_series_tables.len()
1398 }
1399
1400 pub fn write_timeseries_batch(
1403 &mut self,
1404 table_name: &str,
1405 data_points: &[time_series::TimeSeriesRecord]
1406 ) -> Result<usize> {
1407 if data_points.is_empty() {
1408 return Err(RemDbError::ConfigError);
1409 }
1410
1411 let table_id = self.time_series_tables.iter().position(|table| {
1413 if let Some(table) = table {
1414 table.def.base.name == table_name
1415 } else {
1416 false
1417 }
1418 }).ok_or(RemDbError::TableNotFound)?;
1419
1420 let table = match &mut self.time_series_tables[table_id] {
1421 Some(table) => table,
1422 None => return Err(RemDbError::TableNotFound),
1423 };
1424
1425 let has_active_tx = unsafe { crate::transaction::has_active_tx() };
1431
1432 let mut inserted = 0;
1435
1436 if !has_active_tx {
1437 for record in data_points {
1440 let mut partitions_guard = table.partitions.lock().unwrap();
1442 let partition = partitions_guard.get_or_create_partition(record.timestamp);
1443
1444 let mut partition_guard = partition.lock().unwrap();
1446 partition_guard.records.push(*record);
1447 partition_guard.stats.record_count = partition_guard.records.len();
1448
1449 table.index.insert(record.timestamp, inserted as usize);
1451
1452 inserted += 1;
1453 }
1454
1455 Ok(inserted)
1456 } else {
1457 table.write_timeseries_batch(data_points)
1459 }
1460 }
1461
1462 pub fn create_index(&mut self, table_name: &str, field_name: &str, index_type: IndexType) -> Result<()> {
1464 DdlExecutor::create_index(self, table_name, field_name, index_type)
1466 }
1467
1468 pub fn insert_record(&mut self, table_name: &str, column_names: &[&str], values: &[&str]) -> Result<usize> {
1470 let columns = if column_names.is_empty() {
1472 "".to_string() } else {
1474 alloc::format!("({})
1475", column_names.join(", ")) };
1477
1478 let quoted_values: Vec<String> = values.iter().map(|&value| {
1480 if value.chars().all(|c| c.is_digit(10) || c == '.' || c == '-') || value == "true" || value == "false" {
1482 value.to_string()
1483 } else {
1484 alloc::format!("'{}'", value)
1486 }
1487 }).collect();
1488
1489 let values_str = alloc::format!("({})
1490", quoted_values.join(", "));
1491
1492 let sql = alloc::format!("INSERT INTO {}{} VALUES {}", table_name, columns, values_str);
1493
1494 let result_set = self.sql_query(&sql)?;
1496
1497 if let Some(row) = result_set.rows.first() {
1499 if let Some(value) = row.values.first() {
1500 unsafe {
1502 let affected_rows = value.value.u64 as usize;
1503 return Ok(affected_rows);
1504 }
1505 }
1506 }
1507
1508 Ok(0)
1509 }
1510
1511 pub fn batch_insert_record(&mut self, table_name: &str, column_names: &[&str], records: &[&[&str]]) -> Result<usize> {
1513 if records.is_empty() {
1514 return Ok(0);
1515 }
1516
1517 let columns = if column_names.is_empty() {
1519 "".to_string()
1520 } else {
1521 alloc::format!("({})
1522", column_names.join(", "))
1523 };
1524
1525 let mut all_values: Vec<String> = Vec::with_capacity(records.len());
1527
1528 for values in records {
1529 let quoted_values: Vec<String> = values.iter().map(|&value| {
1530 if value.chars().all(|c| c.is_digit(10) || c == '.' || c == '-') || value == "true" || value == "false" {
1532 value.to_string()
1533 } else {
1534 alloc::format!("'{}'", value)
1536 }
1537 }).collect();
1538
1539 all_values.push(alloc::format!("({})
1540", quoted_values.join(", ")));
1541 }
1542
1543 let values_str = all_values.join(", ");
1544 let sql = alloc::format!("INSERT INTO {}{} VALUES {}", table_name, columns, values_str);
1545
1546 let result_set = self.sql_query(&sql)?;
1548
1549 if let Some(row) = result_set.rows.first() {
1551 if let Some(value) = row.values.first() {
1552 unsafe {
1554 let affected_rows = value.value.u64 as usize;
1555 return Ok(affected_rows);
1556 }
1557 }
1558 }
1559
1560 Ok(0)
1561 }
1562
1563 pub fn update_record(&mut self, table_name: &str, set_clause: &str, where_clause: Option<&str>) -> Result<usize> {
1565 let mut sql = alloc::format!("UPDATE {} SET {}", table_name, set_clause);
1567
1568 if let Some(where_clause) = where_clause {
1569 sql.push_str(&alloc::format!(" WHERE {}", where_clause));
1570 }
1571
1572 let result_set = self.sql_query(&sql)?;
1574
1575 if let Some(row) = result_set.rows.first() {
1577 if let Some(value) = row.values.first() {
1578 unsafe {
1580 let affected_rows = value.value.u64 as usize;
1581 return Ok(affected_rows);
1582 }
1583 }
1584 }
1585
1586 Ok(0)
1587 }
1588
1589 pub fn delete_record(&mut self, table_name: &str, where_clause: Option<&str>) -> Result<usize> {
1591 let mut sql = alloc::format!("DELETE FROM {}", table_name);
1593
1594 if let Some(where_clause) = where_clause {
1595 sql.push_str(&alloc::format!(" WHERE {}", where_clause));
1596 }
1597
1598 let result_set = self.sql_query(&sql)?;
1600
1601 if let Some(row) = result_set.rows.first() {
1603 if let Some(value) = row.values.first() {
1604 unsafe {
1606 let affected_rows = value.value.u64 as usize;
1607 return Ok(affected_rows);
1608 }
1609 }
1610 }
1611
1612 Ok(0)
1613 }
1614
1615 #[cfg(feature = "std")]
1617 pub fn export_ddl(&self, path: &str) -> Result<()> {
1618 use std::fs::File;
1620 use std::io::Write;
1621
1622 let mut file = File::create(path).map_err(|_| RemDbError::FileIoError)?;
1623
1624 for table_id in 0..self.tables.len() {
1626 if let Some(table) = &self.tables[table_id] {
1627 let mut create_table_sql = alloc::string::String::new();
1629 create_table_sql.push_str(&format!("CREATE TABLE {} (\n", table.def.name.to_lowercase()));
1630
1631 let mut fields_sql = Vec::new();
1633 for field in table.def.fields {
1634 let field_sql = format!(" {} {} {}",
1635 field.name,
1636 field.data_type.to_sql_type(field.size),
1637 field.constraints_to_sql());
1638 fields_sql.push(field_sql);
1639 }
1640
1641 create_table_sql.push_str(&fields_sql.join(",\n"));
1643 create_table_sql.push_str("\n);\n\n");
1644
1645 file.write_all(create_table_sql.as_bytes()).map_err(|_| RemDbError::FileIoError)?;
1647
1648 if let Some(secondary_index) = table.def.secondary_index {
1650 if secondary_index < table.def.fields.len() {
1651 let index_field = &table.def.fields[secondary_index];
1652 let index_name = format!("idx_{}_{}", table.def.name.to_lowercase(), index_field.name);
1653 let index_type = match table.def.secondary_index_type {
1654 IndexType::Hash => "hash",
1655 IndexType::SortedArray => "sortedarray",
1656 IndexType::BTree => "btree",
1657 IndexType::TTree => "ttree",
1658 };
1659
1660 let create_index_sql = format!("CREATE INDEX {} ON {} USING {} ({});\n\n",
1661 index_name, table.def.name.to_lowercase(), index_type, index_field.name);
1662
1663 file.write_all(create_index_sql.as_bytes()).map_err(|_| RemDbError::FileIoError)?;
1665 }
1666 }
1667 }
1668 }
1669
1670 for ts_table_id in 0..self.time_series_tables.len() {
1672 if let Some(ts_table) = &self.time_series_tables[ts_table_id] {
1673 let def = &ts_table.def;
1674 let base_def = &def.base;
1675
1676 let mut create_ts_table_sql = alloc::string::String::new();
1678 create_ts_table_sql.push_str(&format!("CREATE TIMESERIES TABLE {} (\n", base_def.name.to_lowercase()));
1679
1680 let mut fields_sql = Vec::new();
1682 for field in base_def.fields {
1683 let field_sql = format!(" {} {}",
1684 field.name,
1685 field.data_type.to_sql_type(field.size));
1686 fields_sql.push(field_sql);
1687 }
1688
1689 create_ts_table_sql.push_str(&fields_sql.join(",\n"));
1691
1692 let mut with_clauses = Vec::new();
1694
1695 let compression_alg = match def.config.compression {
1697 crate::time_series::CompressionType::None => "none",
1698 crate::time_series::CompressionType::Delta => "delta",
1699 crate::time_series::CompressionType::RunLength => "runlength",
1700 crate::time_series::CompressionType::DeltaRunLength => "delta-runlength",
1701 crate::time_series::CompressionType::DeltaDelta => "delta-delta",
1702 };
1703 with_clauses.push(format!("COMPRESSION = (algorithm='{}', enabled=true)", compression_alg));
1704
1705 let ttl_days = def.config.retention_period_secs / (24 * 3600);
1707 with_clauses.push(format!("TTL = '{} days'", ttl_days));
1708
1709 if !with_clauses.is_empty() {
1710 create_ts_table_sql.push_str(&format!("\n) WITH {}\n\n", with_clauses.join(", ")));
1711 } else {
1712 create_ts_table_sql.push_str("\n)\n\n");
1713 }
1714
1715 file.write_all(create_ts_table_sql.as_bytes()).map_err(|_| RemDbError::FileIoError)?;
1717 }
1718 }
1719
1720 Ok(())
1721 }
1722
1723 #[cfg(feature = "std")]
1725 pub fn export_data(&self, path: &str) -> Result<()> {
1726 use std::fs::File;
1728 use std::io::Write;
1729
1730 let mut sql_statements = alloc::string::String::new();
1732
1733 for table_id in 0..self.tables.len() {
1735 if let Some(table) = &self.tables[table_id] {
1736 let table_ref = table.def.clone();
1738
1739 unsafe {
1741 table.iterate(|_id, record_ptr| {
1742 let mut insert_sql = alloc::string::String::new();
1744 insert_sql.push_str(&format!("INSERT INTO {} (", table_ref.name.to_lowercase()));
1745
1746 let mut field_names = Vec::new();
1748 let mut field_values = Vec::new();
1749
1750 for field in table_ref.fields.iter() {
1751 field_names.push(field.name);
1752
1753 let field_ptr = record_ptr.add(field.offset);
1755 let value_str = match field.data_type {
1756 DataType::UInt8 => format!("{}", *field_ptr as u8),
1757 DataType::UInt16 => format!("{}", core::ptr::read_unaligned(field_ptr as *const u16)),
1758 DataType::UInt32 => format!("{}", core::ptr::read_unaligned(field_ptr as *const u32)),
1759 DataType::UInt64 => format!("{}", core::ptr::read_unaligned(field_ptr as *const u64)),
1760 DataType::Int8 => format!("{}", core::ptr::read_unaligned(field_ptr as *const i8)),
1761 DataType::Int16 => format!("{}", core::ptr::read_unaligned(field_ptr as *const i16)),
1762 DataType::Int32 => format!("{}", core::ptr::read_unaligned(field_ptr as *const i32)),
1763 DataType::Int64 => format!("{}", core::ptr::read_unaligned(field_ptr as *const i64)),
1764 DataType::Float32 => format!("{}", core::ptr::read_unaligned(field_ptr as *const f32)),
1765 DataType::Float64 => format!("{}", core::ptr::read_unaligned(field_ptr as *const f64)),
1766 DataType::Bool => format!("{}", *field_ptr != 0),
1767 DataType::Timestamp => format!("{}", core::ptr::read_unaligned(field_ptr as *const crate::types::db_timestamp).value),
1768 DataType::TimestampTZ => format!("{}", core::ptr::read_unaligned(field_ptr as *const crate::types::db_timestamp).value),
1769 DataType::String => {
1770 let mut str_value = alloc::string::String::new();
1772 for i in 0..field.size {
1773 let c = *field_ptr.add(i);
1774 if c == 0 {
1775 break;
1776 }
1777 str_value.push(c as char);
1778 }
1779 alloc::format!("'{}'", str_value)
1780 },
1781 DataType::Interval => {
1782 alloc::format!("{}", core::ptr::read_unaligned(field_ptr as *const crate::types::db_interval).value)
1783 },
1784 };
1785
1786 field_values.push(value_str);
1787 }
1788
1789 insert_sql.push_str(&field_names.join(", "));
1791 insert_sql.push_str(") VALUES (");
1792 insert_sql.push_str(&field_values.join(", "));
1793 insert_sql.push_str(");\n");
1794
1795 sql_statements.push_str(&insert_sql);
1797
1798 true
1800 }).unwrap();
1801 }
1802 }
1803 }
1804
1805 let mut file = File::create(path).map_err(|_| RemDbError::FileIoError)?;
1807 file.write_all(sql_statements.as_bytes()).map_err(|_| RemDbError::FileIoError)?;
1808
1809 Ok(())
1810 }
1811
1812 pub fn save_incremental_snapshot(&mut self, path: &str) -> Result<()> {
1814 let handle = crate::platform::file_open(path, crate::platform::FileMode::Write)
1816 .map_err(|_| RemDbError::FileIoError)?;
1817
1818 let _defer = Defer::new(|| {
1820 let _ = crate::platform::file_close(handle);
1821 });
1822
1823 let magic = Self::SNAPSHOT_MAGIC.to_le_bytes();
1825 let written = crate::platform::file_write(handle, magic.as_ptr(), magic.len())
1826 .map_err(|_| RemDbError::FileIoError)?;
1827 if written != magic.len() {
1828 return Err(RemDbError::FileIoError);
1829 }
1830
1831 let version = Self::SNAPSHOT_VERSION.to_le_bytes();
1833 let written = crate::platform::file_write(handle, version.as_ptr(), version.len())
1834 .map_err(|_| RemDbError::FileIoError)?;
1835 if written != version.len() {
1836 return Err(RemDbError::FileIoError);
1837 }
1838
1839 let snapshot_type = 1u8;
1841 let written = crate::platform::file_write(handle, &snapshot_type as *const u8, 1)
1842 .map_err(|_| RemDbError::FileIoError)?;
1843 if written != 1 {
1844 return Err(RemDbError::FileIoError);
1845 }
1846
1847 let base_version_bytes = self.snapshot_version.to_le_bytes();
1849 let written = crate::platform::file_write(handle, base_version_bytes.as_ptr(), base_version_bytes.len())
1850 .map_err(|_| RemDbError::FileIoError)?;
1851 if written != base_version_bytes.len() {
1852 return Err(RemDbError::FileIoError);
1853 }
1854
1855 let table_count = self.config.tables.len() as u32;
1857 let table_count_bytes = table_count.to_le_bytes();
1858 let written = crate::platform::file_write(handle, table_count_bytes.as_ptr(), table_count_bytes.len())
1859 .map_err(|_| RemDbError::FileIoError)?;
1860 if written != table_count_bytes.len() {
1861 return Err(RemDbError::FileIoError);
1862 }
1863
1864 for table_id in 0..table_count as usize {
1866 if let Some(table) = &mut self.tables[table_id] {
1867 let mut changed_records = 0;
1869 let mut record_indices = Vec::new();
1870
1871 for i in 0..table.def.max_records {
1872 let status_ptr = unsafe { table.get_status_ptr(i) };
1873 if unsafe { (*status_ptr).status } == crate::types::RecordStatus::Used {
1874 let status = unsafe { &*status_ptr };
1875 if status.version > table.snapshot_version as u16 {
1876 changed_records += 1;
1877 record_indices.push(i);
1878 }
1879 }
1880 }
1881
1882 let table_id_u32 = table_id as u32;
1884 let table_id_bytes = table_id_u32.to_le_bytes();
1885 let written = crate::platform::file_write(handle, table_id_bytes.as_ptr(), table_id_bytes.len())
1886 .map_err(|_| RemDbError::FileIoError)?;
1887 if written != table_id_bytes.len() {
1888 return Err(RemDbError::FileIoError);
1889 }
1890
1891 let changed_count_u32 = changed_records as u32;
1893 let changed_count_bytes = changed_count_u32.to_le_bytes();
1894 let written = crate::platform::file_write(handle, changed_count_bytes.as_ptr(), changed_count_bytes.len())
1895 .map_err(|_| RemDbError::FileIoError)?;
1896 if written != changed_count_bytes.len() {
1897 return Err(RemDbError::FileIoError);
1898 }
1899
1900 let mut record_size = 0;
1902 for field in table.def.fields {
1903 record_size += field.size;
1904 }
1905
1906 for i in record_indices {
1908 let index_u32 = i as u32;
1910 let index_bytes = index_u32.to_le_bytes();
1911 let written = crate::platform::file_write(handle, index_bytes.as_ptr(), index_bytes.len())
1912 .map_err(|_| RemDbError::FileIoError)?;
1913 if written != index_bytes.len() {
1914 return Err(RemDbError::FileIoError);
1915 }
1916
1917 let record_ptr = unsafe { table.get_record_ptr(i) };
1919 let written = crate::platform::file_write(handle, record_ptr, record_size)
1920 .map_err(|_| RemDbError::FileIoError)?;
1921 if written != record_size {
1922 return Err(RemDbError::FileIoError);
1923 }
1924 }
1925
1926 table.snapshot_version = self.snapshot_version;
1928 }
1929 }
1930
1931 Ok(())
1933 }
1934}
1935
1936
1937
1938
1939static mut DB_INSTANCE: Option<RemDb> = None;
1941
1942pub fn init_global_db(
1945 config: &'static config::DbConfig
1946) -> Result<&'static mut RemDb> {
1947 unsafe {
1948 let mut db = RemDb::new(config);
1950 db.init()?;
1951
1952 for table_def in config.tables {
1954 let table = MemoryTable::new(alloc::sync::Arc::new(*table_def))?;
1956 db.tables.push(Some(table));
1957
1958 db.primary_indices.push(None);
1960 db.secondary_indices.push(None);
1961 }
1962
1963 DB_INSTANCE = Some(db);
1965
1966 Ok(DB_INSTANCE.as_mut().unwrap())
1967 }
1968}
1969
1970pub fn get_global_db() -> Option<&'static mut RemDb> {
1972 unsafe {
1973 DB_INSTANCE.as_mut()
1974 }
1975}
1976
1977pub fn reset_global_db() {
1980 unsafe {
1981 #[cfg(feature = "ha")]
1983 let _ = crate::ha::shutdown();
1984
1985 DB_INSTANCE = None;
1986 crate::transaction::TX_MANAGER.reset();
1988 crate::transaction::TX_MANAGER.clear_log_manager();
1990 }
1991}
1992
1993#[cfg(feature = "c-api")]
1995mod c_api;
1996
1997#[cfg(all(not(feature = "std"), not(test)))]
1999#[panic_handler]
2000fn panic_handler(_info: &core::panic::PanicInfo) -> ! {
2001 loop {
2002 core::hint::spin_loop();
2003 }
2004}