1use core::ptr::NonNull;
2use crate::{types::{RecordHeader, RecordStatus, TableDef, Value, Result, RemDbError, DataType}, DataType as CrateDataType};
3use crate::platform::{memcpy, memset};
4use crate::defer;
5
6extern crate alloc;
8
9pub struct MemoryTable {
11 pub def: alloc::sync::Arc<TableDef>,
13 pub data_start: NonNull<u8>,
15 pub status_array: NonNull<RecordHeader>,
17 pub record_count: usize,
19 pub lock: u32,
21 pub record_size: usize,
23 pub free_slots: NonNull<usize>,
25 pub free_slot_count: usize,
27 pub low_power_mode: bool,
29 pub low_power_max_records: Option<usize>,
31 pub snapshot_version: u32,
33 pub next_auto_id: u64,
35}
36
37impl Drop for MemoryTable {
39 fn drop(&mut self) {
40 unsafe {
41 crate::memory::allocator::free(self.data_start);
43 crate::memory::allocator::free(self.status_array.cast());
45 crate::memory::allocator::free(self.free_slots.cast());
47 }
48 }
49}
50
51impl MemoryTable {
52 pub fn new(def: alloc::sync::Arc<TableDef>) -> Result<Self> {
54 let data_size = def.record_size * def.max_records;
56 let status_size = core::mem::size_of::<RecordHeader>() * def.max_records;
57 let free_slots_size = core::mem::size_of::<usize>() * def.max_records;
58
59 let data_start = crate::memory::allocator::alloc(data_size)?;
61 let status_start = crate::memory::allocator::alloc(status_size)?;
62 let free_slots_start = crate::memory::allocator::alloc(free_slots_size)?;
63
64 unsafe {
66 let status_array = status_start.cast::<RecordHeader>();
67 for i in 0..def.max_records {
68 let status_ptr = status_array.as_ptr().add(i);
69 (*status_ptr).status = RecordStatus::Free;
70 (*status_ptr).version = 0;
71 (*status_ptr).lock_type = crate::types::LockType::None;
72 (*status_ptr).lock_owner = 0;
73 (*status_ptr).lock_count = 0;
74 }
75
76 let free_slots = free_slots_start.cast::<usize>();
78 for i in 0..def.max_records {
79 *free_slots.as_ptr().add(i) = (def.max_records - 1 - i) as usize;
80 }
81 }
82
83 Ok(MemoryTable {
84 def: def.clone(),
85 data_start,
86 status_array: status_start.cast(),
87 record_count: 0,
88 lock: 0,
89 record_size: def.record_size, free_slots: free_slots_start.cast(),
91 free_slot_count: def.max_records,
92 low_power_mode: false, low_power_max_records: None, snapshot_version: 0, next_auto_id: 1, })
97 }
98
99 pub const fn calculate_memory_size(def: &TableDef) -> usize {
101 let data_size = def.record_size * def.max_records;
103 let status_size = core::mem::size_of::<RecordHeader>() * def.max_records;
105 let free_slots_size = core::mem::size_of::<usize>() * def.max_records;
107
108 data_size + status_size + free_slots_size
109 }
110
111 pub unsafe fn validate_constraints(&self, record_data: *const u8, exclude_slot: Option<usize>) -> Result<()>
113 {
114 for field in self.def.fields {
118 if field.not_null {
119 let is_null = match field.data_type {
121 DataType::String => {
122 let str_ptr = record_data.add(field.offset) as *const u8;
124 let mut all_zero = true;
125 for i in 0..field.size {
126 if *str_ptr.add(i) != 0 {
127 all_zero = false;
128 break;
129 }
130 }
131 all_zero
132 },
133 DataType::Bool => {
134 false
136 },
137 DataType::Float32 => {
138 let float_value = core::ptr::read_unaligned(record_data.add(field.offset) as *const f32);
140 float_value.is_nan()
141 },
142 DataType::Float64 => {
143 let float_value = core::ptr::read_unaligned(record_data.add(field.offset) as *const f64);
145 float_value.is_nan()
146 },
147 _ => {
148 false
151 },
152 };
153 if is_null {
154 return Err(RemDbError::TypeMismatch);
155 }
156 }
157 }
158
159 let primary_key_index = self.def.primary_key;
161 if primary_key_index < self.def.fields.len() {
162 let primary_key_field = &self.def.fields[primary_key_index];
164 let primary_key_offset = primary_key_field.offset;
165 let primary_key_data_type = primary_key_field.data_type;
166
167 let primary_key_ptr = record_data.add(primary_key_offset);
169
170 let mut has_duplicate = false;
172
173 for i in 0..self.def.max_records {
178 let status_ptr = self.status_array.as_ptr().add(i);
179 if (*status_ptr).status == RecordStatus::Used {
180 if Some(i) == exclude_slot {
182 continue;
183 }
184
185 let other_record_ptr = self.data_start.as_ptr().add(i * self.record_size);
187 let other_pk_ptr = other_record_ptr.add(primary_key_offset);
188
189 let is_duplicate = match primary_key_data_type {
191 DataType::UInt8 => {
192 *primary_key_ptr as u8 == *other_pk_ptr as u8
193 },
194 DataType::UInt16 => {
195 core::ptr::read_unaligned(primary_key_ptr as *const u16) ==
196 core::ptr::read_unaligned(other_pk_ptr as *const u16)
197 },
198 DataType::UInt32 => {
199 core::ptr::read_unaligned(primary_key_ptr as *const u32) ==
200 core::ptr::read_unaligned(other_pk_ptr as *const u32)
201 },
202 DataType::UInt64 => {
203 core::ptr::read_unaligned(primary_key_ptr as *const u64) ==
204 core::ptr::read_unaligned(other_pk_ptr as *const u64)
205 },
206 DataType::Int8 => {
207 core::ptr::read_unaligned(primary_key_ptr as *const i8) ==
208 core::ptr::read_unaligned(other_pk_ptr as *const i8)
209 },
210 DataType::Int16 => {
211 core::ptr::read_unaligned(primary_key_ptr as *const i16) ==
212 core::ptr::read_unaligned(other_pk_ptr as *const i16)
213 },
214 DataType::Int32 => {
215 core::ptr::read_unaligned(primary_key_ptr as *const i32) ==
216 core::ptr::read_unaligned(other_pk_ptr as *const i32)
217 },
218 DataType::Int64 => {
219 core::ptr::read_unaligned(primary_key_ptr as *const i64) ==
220 core::ptr::read_unaligned(other_pk_ptr as *const i64)
221 },
222 DataType::Float32 => {
223 core::ptr::read_unaligned(primary_key_ptr as *const f32) ==
224 core::ptr::read_unaligned(other_pk_ptr as *const f32)
225 },
226 DataType::Float64 => {
227 core::ptr::read_unaligned(primary_key_ptr as *const f64) ==
228 core::ptr::read_unaligned(other_pk_ptr as *const f64)
229 },
230 _ => {
231 false
233 },
234 };
235
236 if is_duplicate {
237 has_duplicate = true;
238 break;
239 }
240 }
241 }
242
243 if has_duplicate {
244 return Err(RemDbError::DuplicateKey);
245 }
246 }
247
248 Ok(())
249 }
250
251 unsafe fn get_field_by_offset(&self, record_data: *const u8, offset: usize, data_type: DataType, size: usize) -> Result<Value>
253 {
254 let field_ptr = record_data.add(offset);
255
256 let value = match data_type {
257 DataType::UInt8 => Value { u8: *field_ptr as u8 },
258 DataType::UInt16 => Value { u16: core::ptr::read_unaligned(field_ptr as *const u16) },
259 DataType::UInt32 => Value { u32: core::ptr::read_unaligned(field_ptr as *const u32) },
260 DataType::UInt64 => Value { u64: core::ptr::read_unaligned(field_ptr as *const u64) },
261 DataType::Int8 => Value { i8: core::ptr::read_unaligned(field_ptr as *const i8) },
262 DataType::Int16 => Value { i16: core::ptr::read_unaligned(field_ptr as *const i16) },
263 DataType::Int32 => Value { i32: core::ptr::read_unaligned(field_ptr as *const i32) },
264 DataType::Int64 => Value { i64: core::ptr::read_unaligned(field_ptr as *const i64) },
265 DataType::Float32 => Value { float32: core::ptr::read_unaligned(field_ptr as *const f32) },
266 DataType::Float64 => Value { float64: core::ptr::read_unaligned(field_ptr as *const f64) },
267 DataType::Bool => Value { bool: *field_ptr != 0 },
268 DataType::Timestamp => Value { timestamp: core::ptr::read_unaligned(field_ptr as *const u64) },
269 DataType::String => {
270 let mut str_value = [0u8; crate::types::MAX_STRING_LEN];
271 memcpy(str_value.as_mut_ptr(), field_ptr, size);
272 Value { string: str_value }
273 },
274 };
275
276 Ok(value)
277 }
278
279 pub fn insert(&mut self, record_data: *const u8) -> Result<usize> {
281 crate::get_global_db().map(|db| db.metrics.inc_write_ops());
283
284 let max_records = if self.low_power_mode {
286 self.low_power_max_records.unwrap_or(self.def.max_records)
287 } else {
288 self.def.max_records
289 };
290
291 let mut record_buffer = [0u8; 512];
293 let record_ptr: *const u8;
294
295 let primary_key_field = &self.def.fields[self.def.primary_key];
297 let mut needs_auto_increment = primary_key_field.auto_increment;
298
299 if needs_auto_increment {
301 unsafe {
302 let pk_offset = primary_key_field.offset;
303 let is_zero = match primary_key_field.data_type {
304 DataType::UInt8 => *record_data.add(pk_offset) == 0,
305 DataType::UInt16 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const u16) == 0,
306 DataType::UInt32 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const u32) == 0,
307 DataType::UInt64 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const u64) == 0,
308 DataType::Int8 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const i8) == 0,
309 DataType::Int16 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const i16) == 0,
310 DataType::Int32 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const i32) == 0,
311 DataType::Int64 => core::ptr::read_unaligned(record_data.add(pk_offset) as *const i64) == 0,
312 _ => true,
313 };
314
315 if !is_zero {
317 needs_auto_increment = false;
318 }
319 }
320 }
321
322 if needs_auto_increment {
323 crate::platform::spin_lock(&mut self.lock);
325 let auto_id = self.next_auto_id;
326 self.next_auto_id += 1;
327 crate::platform::spin_unlock(&mut self.lock);
328
329 unsafe {
331 memcpy(record_buffer.as_mut_ptr(), record_data, self.record_size);
332 }
333
334 unsafe {
336 let pk_offset = primary_key_field.offset;
337 match primary_key_field.data_type {
338 DataType::UInt8 => {
339 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut u8) = auto_id as u8;
340 },
341 DataType::UInt16 => {
342 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut u16) = auto_id as u16;
343 },
344 DataType::UInt32 => {
345 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut u32) = auto_id as u32;
346 },
347 DataType::UInt64 => {
348 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut u64) = auto_id;
349 },
350 DataType::Int8 => {
351 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut i8) = auto_id as i8;
352 },
353 DataType::Int16 => {
354 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut i16) = auto_id as i16;
355 },
356 DataType::Int32 => {
357 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut i32) = auto_id as i32;
358 },
359 DataType::Int64 => {
360 *(record_buffer.as_mut_ptr().add(pk_offset) as *mut i64) = auto_id as i64;
361 },
362 _ => {
363 return Err(RemDbError::TypeMismatch);
364 }
365 }
366 }
367
368 record_ptr = record_buffer.as_ptr();
369 } else {
370 record_ptr = record_data;
371 }
372
373 unsafe {
376 self.validate_constraints(record_ptr, None)?;
377 }
378
379 crate::platform::spin_lock(&mut self.lock);
381 defer! { crate::platform::spin_unlock(&mut self.lock); }
382
383 let mut slot_id = 0;
384 let mut is_overwrite = false;
385
386 if self.record_count >= max_records {
387 if self.low_power_mode {
388 let mut oldest_id = 0;
391 let mut oldest_version = u16::MAX;
392
393 for i in 0..self.def.max_records {
394 unsafe {
395 let status_ptr = self.status_array.as_ptr().add(i);
396 let status = &*status_ptr;
397 if status.status == RecordStatus::Used && status.version < oldest_version {
398 oldest_id = i;
399 oldest_version = status.version;
400 }
401 }
402 }
403
404 slot_id = oldest_id;
405 is_overwrite = true;
406 } else {
407 return Err(RemDbError::OutOfMemory);
409 }
410 } else {
411 if self.free_slot_count == 0 {
413 return Err(RemDbError::OutOfMemory);
414 }
415
416 slot_id = unsafe {
418 self.free_slot_count -= 1;
419 *self.free_slots.as_ptr().add(self.free_slot_count)
420 };
421 }
422
423 let dest_record_ptr = unsafe { self.data_start.as_ptr().add(slot_id * self.record_size) };
425
426 if let Some(mut tx) = crate::transaction::get_current_tx() {
428 let tx_mut = unsafe { tx.as_mut() };
429 if tx_mut.is_active() && !tx_mut.is_read_only() {
430 let mut new_data = [0u8; 512];
432 memcpy(new_data.as_mut_ptr(), record_ptr, self.record_size);
433
434 unsafe {
436 tx_mut.add_log_item(
437 crate::transaction::LogOperation::Insert,
438 self.def.id,
439 slot_id as u16,
440 core::ptr::null(),
441 new_data.as_ptr(),
442 self.record_size
443 )?;
444 }
445 }
446 }
447
448 memcpy(dest_record_ptr, record_ptr, self.record_size);
450
451 let status_ptr = unsafe { self.status_array.as_ptr().add(slot_id) };
453 unsafe {
454 (*status_ptr).status = RecordStatus::Used;
455 (*status_ptr).version += 1;
456 }
457
458 if !is_overwrite {
460 self.record_count += 1;
461 crate::get_global_db().map(|db| db.metrics.add_used_memory(self.record_size));
463 }
464
465 Ok(slot_id)
466 }
467
468 pub unsafe fn update(&mut self, id: usize, record_data: *const u8) -> Result<()> {
470 crate::get_global_db().map(|db| db.metrics.inc_update_ops());
472
473 if id >= self.def.max_records {
475 return Err(RemDbError::RecordNotFound);
476 }
477
478 let status_ptr = self.status_array.as_ptr().add(id);
480 if (*status_ptr).status != RecordStatus::Used {
481 return Err(RemDbError::RecordNotFound);
482 }
483
484 self.validate_constraints(record_data, Some(id))?;
486
487 crate::platform::spin_lock(&mut self.lock);
489 defer! { crate::platform::spin_unlock(&mut self.lock); }
490
491 let record_ptr = self.data_start.as_ptr().add(id * self.record_size);
493
494 if let Some(mut tx) = crate::transaction::get_current_tx() {
496 let tx_mut = tx.as_mut();
497 if tx_mut.is_active() && !tx_mut.is_read_only() {
498 let mut old_data = [0u8; 512];
500 memcpy(old_data.as_mut_ptr(), record_ptr, self.record_size);
501
502 let mut new_data = [0u8; 512];
504 memcpy(new_data.as_mut_ptr(), record_data, self.record_size);
505
506 tx_mut.add_log_item(
508 crate::transaction::LogOperation::Update,
509 self.def.id,
510 id as u16,
511 old_data.as_ptr(),
512 new_data.as_ptr(),
513 self.record_size
514 )?;
515 }
516 }
517
518 memcpy(record_ptr, record_data, self.record_size);
520
521 (*status_ptr).version += 1;
523
524 Ok(())
525 }
526
527 pub unsafe fn delete(&mut self, id: usize) -> Result<()> {
529 crate::get_global_db().map(|db| db.metrics.inc_delete_ops());
531 crate::platform::spin_lock(&mut self.lock);
533 defer! { crate::platform::spin_unlock(&mut self.lock); }
534
535 if id >= self.def.max_records {
537 return Err(RemDbError::RecordNotFound);
538 }
539
540 let status_ptr = self.status_array.as_ptr().add(id);
541 if (*status_ptr).status != RecordStatus::Used {
542 return Err(RemDbError::RecordNotFound);
543 }
544
545 if let Some(mut tx) = crate::transaction::get_current_tx() {
547 let tx_mut = tx.as_mut();
548 if tx_mut.is_active() && !tx_mut.is_read_only() {
549 let record_ptr = self.data_start.as_ptr().add(id * self.record_size);
551 let mut old_data = [0u8; 512];
552 memcpy(old_data.as_mut_ptr(), record_ptr, self.record_size);
553
554 tx_mut.add_log_item(
556 crate::transaction::LogOperation::Delete,
557 self.def.id,
558 id as u16,
559 old_data.as_ptr(),
560 core::ptr::null(),
561 self.record_size
562 )?;
563 }
564 }
565
566 (*status_ptr).status = RecordStatus::Free;
568 (*status_ptr).version += 1;
569
570 let record_ptr = self.data_start.as_ptr().add(id * self.record_size);
572 memset(record_ptr, 0, self.record_size);
573
574 if self.free_slot_count < self.def.max_records {
576 *self.free_slots.as_ptr().add(self.free_slot_count) = id;
577 self.free_slot_count += 1;
578 }
579
580 self.record_count -= 1;
582
583 crate::get_global_db().map(|db| db.metrics.sub_used_memory(self.record_size));
585
586 Ok(())
587 }
588
589 pub unsafe fn get_by_id(&self, id: usize, dest: *mut u8) -> Result<()> {
591 crate::get_global_db().map(|db| db.metrics.inc_read_ops());
593 if id >= self.def.max_records {
595 return Err(RemDbError::RecordNotFound);
596 }
597
598 let status_ptr = self.status_array.as_ptr().add(id);
599 if (*status_ptr).status != RecordStatus::Used {
600 return Err(RemDbError::RecordNotFound);
601 }
602
603 let record_ptr = self.data_start.as_ptr().add(id * self.record_size);
605 memcpy(dest, record_ptr, self.record_size);
606
607 Ok(())
608 }
609
610 pub unsafe fn get_field(
612 &self,
613 record_data: *const u8,
614 field_index: usize
615 ) -> Result<Value> {
616 if field_index >= self.def.fields.len() {
618 return Err(RemDbError::FieldNotFound);
619 }
620
621 let field = &self.def.fields[field_index];
622 let field_ptr = record_data.add(field.offset);
623
624 let value = match field.data_type {
626 crate::types::DataType::UInt8 => {
627 Value { u8: *field_ptr as u8 }
628 }
629 crate::types::DataType::UInt16 => {
630 Value { u16: core::ptr::read_unaligned(field_ptr as *const u16) }
631 }
632 crate::types::DataType::UInt32 => {
633 Value { u32: core::ptr::read_unaligned(field_ptr as *const u32) }
634 }
635 crate::types::DataType::UInt64 => {
636 Value { u64: core::ptr::read_unaligned(field_ptr as *const u64) }
637 }
638 crate::types::DataType::Int8 => {
639 Value { i8: core::ptr::read_unaligned(field_ptr as *const i8) }
640 }
641 crate::types::DataType::Int16 => {
642 Value { i16: core::ptr::read_unaligned(field_ptr as *const i16) }
643 }
644 crate::types::DataType::Int32 => {
645 Value { i32: core::ptr::read_unaligned(field_ptr as *const i32) }
646 }
647 crate::types::DataType::Int64 => {
648 Value { i64: core::ptr::read_unaligned(field_ptr as *const i64) }
649 }
650 crate::types::DataType::Float32 => {
651 Value { float32: core::ptr::read_unaligned(field_ptr as *const f32) }
652 }
653 crate::types::DataType::Float64 => {
654 Value { float64: core::ptr::read_unaligned(field_ptr as *const f64) }
655 }
656 crate::types::DataType::Bool => {
657 Value { bool: *field_ptr != 0 }
658 }
659 crate::types::DataType::Timestamp => {
660 Value { timestamp: core::ptr::read_unaligned(field_ptr as *const u64) }
661 }
662 crate::types::DataType::String => {
663 let mut str_value = [0u8; crate::types::MAX_STRING_LEN];
664 memcpy(str_value.as_mut_ptr(), field_ptr, field.size);
665 Value { string: str_value }
666 }
667 };
668
669 Ok(value)
670 }
671
672 pub unsafe fn set_field(
674 &self,
675 record_data: *mut u8,
676 field_index: usize,
677 value: &Value
678 ) -> Result<()> {
679 if field_index >= self.def.fields.len() {
681 return Err(RemDbError::FieldNotFound);
682 }
683
684 let field = &self.def.fields[field_index];
685 let field_ptr = record_data.add(field.offset);
686
687 match field.data_type {
689 crate::types::DataType::UInt8 => {
690 *(field_ptr as *mut u8) = value.u8;
691 }
692 crate::types::DataType::UInt16 => {
693 *(field_ptr as *mut u16) = value.u16;
694 }
695 crate::types::DataType::UInt32 => {
696 *(field_ptr as *mut u32) = value.u32;
697 }
698 crate::types::DataType::UInt64 => {
699 *(field_ptr as *mut u64) = value.u64;
700 }
701 crate::types::DataType::Int8 => {
702 *(field_ptr as *mut i8) = value.i8;
703 }
704 crate::types::DataType::Int16 => {
705 *(field_ptr as *mut i16) = value.i16;
706 }
707 crate::types::DataType::Int32 => {
708 *(field_ptr as *mut i32) = value.i32;
709 }
710 crate::types::DataType::Int64 => {
711 *(field_ptr as *mut i64) = value.i64;
712 }
713 crate::types::DataType::Float32 => {
714 *(field_ptr as *mut f32) = value.float32;
715 }
716 crate::types::DataType::Float64 => {
717 *(field_ptr as *mut f64) = value.float64;
718 }
719 crate::types::DataType::Bool => {
720 *field_ptr = if value.bool { 1 } else { 0 };
721 }
722 crate::types::DataType::Timestamp => {
723 *(field_ptr as *mut u64) = value.timestamp;
724 }
725 crate::types::DataType::String => {
726 memcpy(field_ptr, value.string.as_ptr(), field.size);
727 }
728 }
729
730 Ok(())
731 }
732
733 pub fn record_count(&self) -> usize {
735 self.record_count
736 }
737
738 pub fn max_records(&self) -> usize {
740 self.def.max_records
741 }
742
743 pub fn is_full(&self) -> bool {
745 self.record_count >= self.def.max_records
746 }
747
748 pub fn set_low_power_mode(&mut self, enabled: bool, max_records: Option<usize>) {
750 self.low_power_mode = enabled;
751 self.low_power_max_records = max_records;
752 }
753
754 pub fn is_low_power_mode(&self) -> bool {
756 self.low_power_mode
757 }
758
759 pub unsafe fn iterate<F>(&self, mut f: F) -> Result<()>
761 where F: FnMut(usize, *const u8) -> bool {
762 for i in 0..self.def.max_records {
763 let status_ptr = self.status_array.as_ptr().add(i);
764 if (*status_ptr).status == RecordStatus::Used {
765 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
766 if !f(i, record_ptr) {
767 break;
768 }
769 }
770 }
771
772 Ok(())
773 }
774
775 pub unsafe fn get_status_ptr(&self, index: usize) -> *mut RecordHeader {
777 self.status_array.as_ptr().add(index)
778 }
779
780 pub unsafe fn get_record_ptr(&self, index: usize) -> *const u8 {
782 self.data_start.as_ptr().add(index * self.record_size)
783 }
784
785 pub unsafe fn get_record_ptr_mut(&mut self, index: usize) -> *mut u8 {
787 self.data_start.as_ptr().add(index * self.record_size) as *mut u8
788 }
789
790 pub unsafe fn set_record_count(&mut self, count: usize) {
792 self.record_count = count;
793 }
794
795 pub unsafe fn inc_record_count(&mut self) {
797 self.record_count += 1;
798 }
799
800 pub unsafe fn batch_insert(&mut self, records: *const u8, count: usize, out_ids: *mut usize) -> Result<usize> {
804 if records.is_null() {
806 return Err(RemDbError::UnsupportedOperation);
807 }
808
809 crate::platform::spin_lock(&mut self.lock);
811
812 let max_records = if self.low_power_mode {
814 self.low_power_max_records.unwrap_or(self.def.max_records)
815 } else {
816 self.def.max_records
817 };
818
819 let available = max_records - self.record_count;
821 let mut actual_count = count;
822
823 if self.low_power_mode && self.record_count >= max_records {
824 actual_count = count;
826 } else if available < count {
827 actual_count = available;
829 }
830
831 if actual_count == 0 {
832 crate::platform::spin_unlock(&mut self.lock);
833 return Err(RemDbError::OutOfMemory);
834 }
835
836 let mut slot_ids = [0usize; 256]; assert!(actual_count <= slot_ids.len(), "Batch insert count exceeds maximum");
839
840 let mut inserted_count = 0;
841 let mut i = 0;
842
843 while i < actual_count && self.free_slot_count > 0 {
845 slot_ids[i] = *self.free_slots.as_ptr().add(self.free_slot_count - 1);
846 self.free_slot_count -= 1;
847 inserted_count += 1;
848 i += 1;
849 }
850
851 if i < actual_count && self.low_power_mode {
853 let mut oldest_ids = [0usize; 256];
855 let mut oldest_versions = [u16::MAX; 256];
856
857 for record_id in 0..self.def.max_records {
858 let status_ptr = self.status_array.as_ptr().add(record_id);
859 let status = &*status_ptr;
860 if status.status == crate::types::RecordStatus::Used {
861 for j in 0..(actual_count - i) {
863 if status.version < oldest_versions[j] {
864 for k in (j+1)..(actual_count - i) {
866 if oldest_versions[k] > oldest_versions[k-1] {
867 break;
868 }
869 oldest_ids[k] = oldest_ids[k-1];
870 oldest_versions[k] = oldest_versions[k-1];
871 }
872 oldest_ids[j] = record_id;
873 oldest_versions[j] = status.version;
874 break;
875 }
876 }
877 }
878 }
879
880 for j in 0..(actual_count - i) {
882 slot_ids[i + j] = oldest_ids[j];
883 }
884 inserted_count = actual_count;
885 }
886
887 crate::platform::spin_unlock(&mut self.lock);
889
890 for j in 0..inserted_count {
892 let slot_id = slot_ids[j];
893
894 if !out_ids.is_null() {
896 *out_ids.add(j) = slot_id;
897 }
898
899 let record_ptr = self.data_start.as_ptr().add(slot_id * self.record_size);
901 let src_ptr = records.add(j * self.record_size);
902
903 memcpy(record_ptr, src_ptr, self.record_size);
905
906 let status_ptr = self.status_array.as_ptr().add(slot_id);
908 (*status_ptr).status = crate::types::RecordStatus::Used;
909 (*status_ptr).version += 1;
910 }
911
912 crate::platform::spin_lock(&mut self.lock);
914
915 let new_records_count = if self.low_power_mode && self.record_count >= max_records {
917 0 } else {
919 inserted_count };
921
922 self.record_count += new_records_count;
923 crate::platform::spin_unlock(&mut self.lock);
924
925 Ok(inserted_count)
926 }
927
928 pub unsafe fn time_series_batch_insert(&mut self, records: *const u8, count: usize, out_ids: *mut usize) -> Result<usize> {
932 if records.is_null() {
937 return Err(RemDbError::UnsupportedOperation);
938 }
939
940 crate::platform::spin_lock(&mut self.lock);
942
943 let available = self.def.max_records - self.record_count;
945 let actual_count = core::cmp::min(count, available);
946 let actual_count = core::cmp::min(actual_count, self.free_slot_count);
947
948 if actual_count == 0 {
949 crate::platform::spin_unlock(&mut self.lock);
950 return Err(RemDbError::OutOfMemory);
951 }
952
953 let original_free_slot_count = self.free_slot_count;
955 let end_free_slot = self.free_slot_count - actual_count;
956
957 self.free_slot_count = end_free_slot;
959
960 crate::platform::spin_unlock(&mut self.lock);
962
963 let mut inserted_count = 0;
965
966 for i in 0..actual_count {
967 let free_slot_index = original_free_slot_count - 1 - i;
969 let slot_id = *self.free_slots.as_ptr().add(free_slot_index);
970
971 if !out_ids.is_null() {
973 *out_ids.add(i) = slot_id;
974 }
975
976 let record_ptr = self.data_start.as_ptr().add(slot_id * self.record_size);
978 let src_ptr = records.add(i * self.record_size);
979
980 memcpy(record_ptr, src_ptr, self.record_size);
982
983 let status_ptr = self.status_array.as_ptr().add(slot_id);
985 (*status_ptr).status = RecordStatus::Used;
986
987 inserted_count += 1;
988 }
989
990 crate::platform::spin_lock(&mut self.lock);
992 self.record_count += inserted_count;
993 crate::platform::spin_unlock(&mut self.lock);
994
995 Ok(inserted_count)
996 }
997
998 pub unsafe fn batch_get(&self, ids: &[usize], dest: *mut u8) -> Result<usize> {
1002 let mut success_count = 0;
1003
1004 for (i, &id) in ids.iter().enumerate() {
1005 if id >= self.def.max_records {
1007 continue;
1008 }
1009
1010 let status_ptr = self.status_array.as_ptr().add(id);
1011 if (*status_ptr).status != RecordStatus::Used {
1012 continue;
1013 }
1014
1015 let record_ptr = self.data_start.as_ptr().add(id * self.record_size);
1017 let dest_ptr = dest.add(i * self.record_size);
1018 memcpy(dest_ptr, record_ptr, self.record_size);
1019
1020 success_count += 1;
1021 }
1022
1023 Ok(success_count)
1024 }
1025
1026 pub unsafe fn aggregate_count(
1029 &self,
1030 time_field_index: usize,
1031 start_time: u64,
1032 end_time: u64
1033 ) -> Result<usize> {
1034 if time_field_index >= self.def.fields.len() {
1036 return Err(RemDbError::FieldNotFound);
1037 }
1038
1039 let mut count = 0;
1040 let time_field = &self.def.fields[time_field_index];
1041
1042 for i in 0..self.def.max_records {
1044 let status_ptr = self.status_array.as_ptr().add(i);
1045 if (*status_ptr).status != RecordStatus::Used {
1046 continue;
1047 }
1048
1049 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1050
1051 let timestamp = match time_field.data_type {
1053 crate::types::DataType::UInt64 => {
1054 core::ptr::read_unaligned(
1055 record_ptr.add(time_field.offset) as *const u64
1056 )
1057 },
1058 crate::types::DataType::Timestamp => {
1059 core::ptr::read_unaligned(
1060 record_ptr.add(time_field.offset) as *const u64
1061 )
1062 },
1063 _ => {
1064 let field_ptr = record_ptr.add(time_field.offset);
1066 match time_field.data_type {
1067 crate::types::DataType::UInt8 => core::ptr::read_unaligned(field_ptr as *const u8) as u64,
1068 crate::types::DataType::UInt16 => core::ptr::read_unaligned(field_ptr as *const u16) as u64,
1069 crate::types::DataType::UInt32 => core::ptr::read_unaligned(field_ptr as *const u32) as u64,
1070 crate::types::DataType::Int8 => core::ptr::read_unaligned(field_ptr as *const i8) as u64,
1071 crate::types::DataType::Int16 => core::ptr::read_unaligned(field_ptr as *const i16) as u64,
1072 crate::types::DataType::Int32 => core::ptr::read_unaligned(field_ptr as *const i32) as u64,
1073 crate::types::DataType::Int64 => core::ptr::read_unaligned(field_ptr as *const i64) as u64,
1074 _ => continue, }
1076 }
1077 };
1078
1079 if timestamp >= start_time && timestamp <= end_time {
1080 count += 1;
1081 }
1082 }
1083
1084 Ok(count)
1085 }
1086
1087 pub unsafe fn aggregate_sum(
1090 &self,
1091 time_field_index: usize,
1092 value_field_index: usize,
1093 start_time: u64,
1094 end_time: u64
1095 ) -> Result<f64> {
1096 if time_field_index >= self.def.fields.len() || value_field_index >= self.def.fields.len() {
1098 return Err(RemDbError::FieldNotFound);
1099 }
1100
1101 let mut sum = 0.0;
1102
1103 for i in 0..self.def.max_records {
1105 let status_ptr = self.status_array.as_ptr().add(i);
1106 if (*status_ptr).status != RecordStatus::Used {
1107 continue;
1108 }
1109
1110 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1111
1112 let time_field = &self.def.fields[time_field_index];
1114 let timestamp = match time_field.data_type {
1115 crate::types::DataType::UInt64 => {
1116 core::ptr::read_unaligned(
1117 record_ptr.add(time_field.offset) as *const u64
1118 )
1119 },
1120 crate::types::DataType::Timestamp => {
1121 core::ptr::read_unaligned(
1122 record_ptr.add(time_field.offset) as *const u64
1123 )
1124 },
1125 _ => {
1126 let field_ptr = record_ptr.add(time_field.offset);
1128 match time_field.data_type {
1129 crate::types::DataType::UInt8 => core::ptr::read_unaligned(field_ptr as *const u8) as u64,
1130 crate::types::DataType::UInt16 => core::ptr::read_unaligned(field_ptr as *const u16) as u64,
1131 crate::types::DataType::UInt32 => core::ptr::read_unaligned(field_ptr as *const u32) as u64,
1132 crate::types::DataType::Int8 => core::ptr::read_unaligned(field_ptr as *const i8) as u64,
1133 crate::types::DataType::Int16 => core::ptr::read_unaligned(field_ptr as *const i16) as u64,
1134 crate::types::DataType::Int32 => core::ptr::read_unaligned(field_ptr as *const i32) as u64,
1135 crate::types::DataType::Int64 => core::ptr::read_unaligned(field_ptr as *const i64) as u64,
1136 _ => continue, }
1138 }
1139 };
1140
1141 if timestamp >= start_time && timestamp <= end_time {
1142 let value = self.get_field(record_ptr, value_field_index)?;
1144 let numeric_value = match self.def.fields[value_field_index].data_type {
1145 crate::types::DataType::UInt8 => value.u8 as f64,
1146 crate::types::DataType::UInt16 => value.u16 as f64,
1147 crate::types::DataType::UInt32 => value.u32 as f64,
1148 crate::types::DataType::UInt64 => value.u64 as f64,
1149 crate::types::DataType::Float32 => value.float32 as f64,
1150 crate::types::DataType::Float64 => value.float64,
1151 _ => return Err(RemDbError::TypeMismatch),
1152 };
1153
1154 sum += numeric_value;
1155 }
1156 }
1157
1158 Ok(sum)
1159 }
1160
1161 pub unsafe fn aggregate_avg(
1164 &self,
1165 time_field_index: usize,
1166 value_field_index: usize,
1167 start_time: u64,
1168 end_time: u64
1169 ) -> Result<f64> {
1170 if time_field_index >= self.def.fields.len() || value_field_index >= self.def.fields.len() {
1172 return Err(RemDbError::FieldNotFound);
1173 }
1174
1175 let mut sum = 0.0;
1176 let mut count = 0;
1177
1178 for i in 0..self.def.max_records {
1180 let status_ptr = self.status_array.as_ptr().add(i);
1181 if (*status_ptr).status != RecordStatus::Used {
1182 continue;
1183 }
1184
1185 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1186
1187 let time_field = &self.def.fields[time_field_index];
1189 let timestamp = match time_field.data_type {
1190 crate::types::DataType::UInt64 => {
1191 core::ptr::read_unaligned(
1192 record_ptr.add(time_field.offset) as *const u64
1193 )
1194 },
1195 crate::types::DataType::Timestamp => {
1196 core::ptr::read_unaligned(
1197 record_ptr.add(time_field.offset) as *const u64
1198 )
1199 },
1200 _ => {
1201 let field_ptr = record_ptr.add(time_field.offset);
1203 match time_field.data_type {
1204 crate::types::DataType::UInt8 => core::ptr::read_unaligned(field_ptr as *const u8) as u64,
1205 crate::types::DataType::UInt16 => core::ptr::read_unaligned(field_ptr as *const u16) as u64,
1206 crate::types::DataType::UInt32 => core::ptr::read_unaligned(field_ptr as *const u32) as u64,
1207 crate::types::DataType::Int8 => core::ptr::read_unaligned(field_ptr as *const i8) as u64,
1208 crate::types::DataType::Int16 => core::ptr::read_unaligned(field_ptr as *const i16) as u64,
1209 crate::types::DataType::Int32 => core::ptr::read_unaligned(field_ptr as *const i32) as u64,
1210 crate::types::DataType::Int64 => core::ptr::read_unaligned(field_ptr as *const i64) as u64,
1211 _ => continue, }
1213 }
1214 };
1215
1216 if timestamp >= start_time && timestamp <= end_time {
1217 let value = self.get_field(record_ptr, value_field_index)?;
1219 let numeric_value = match self.def.fields[value_field_index].data_type {
1220 crate::types::DataType::UInt8 => value.u8 as f64,
1221 crate::types::DataType::UInt16 => value.u16 as f64,
1222 crate::types::DataType::UInt32 => value.u32 as f64,
1223 crate::types::DataType::UInt64 => value.u64 as f64,
1224 crate::types::DataType::Float32 => value.float32 as f64,
1225 crate::types::DataType::Float64 => value.float64,
1226 _ => return Err(RemDbError::TypeMismatch),
1227 };
1228
1229 sum += numeric_value;
1230 count += 1;
1231 }
1232 }
1233
1234 if count == 0 {
1235 Ok(0.0)
1236 } else {
1237 Ok(sum / count as f64)
1238 }
1239 }
1240
1241 pub unsafe fn aggregate_min(
1244 &self,
1245 time_field_index: usize,
1246 value_field_index: usize,
1247 start_time: u64,
1248 end_time: u64
1249 ) -> Result<f64> {
1250 if time_field_index >= self.def.fields.len() || value_field_index >= self.def.fields.len() {
1252 return Err(RemDbError::FieldNotFound);
1253 }
1254
1255 let mut min_value: Option<f64> = None;
1256
1257 for i in 0..self.def.max_records {
1259 let status_ptr = self.status_array.as_ptr().add(i);
1260 if (*status_ptr).status != RecordStatus::Used {
1261 continue;
1262 }
1263
1264 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1265
1266 let time_field = &self.def.fields[time_field_index];
1268 let timestamp = match time_field.data_type {
1269 crate::types::DataType::UInt64 => {
1270 core::ptr::read_unaligned(
1271 record_ptr.add(time_field.offset) as *const u64
1272 )
1273 },
1274 crate::types::DataType::Timestamp => {
1275 core::ptr::read_unaligned(
1276 record_ptr.add(time_field.offset) as *const u64
1277 )
1278 },
1279 _ => {
1280 let field_ptr = record_ptr.add(time_field.offset);
1282 match time_field.data_type {
1283 crate::types::DataType::UInt8 => core::ptr::read_unaligned(field_ptr as *const u8) as u64,
1284 crate::types::DataType::UInt16 => core::ptr::read_unaligned(field_ptr as *const u16) as u64,
1285 crate::types::DataType::UInt32 => core::ptr::read_unaligned(field_ptr as *const u32) as u64,
1286 crate::types::DataType::Int8 => core::ptr::read_unaligned(field_ptr as *const i8) as u64,
1287 crate::types::DataType::Int16 => core::ptr::read_unaligned(field_ptr as *const i16) as u64,
1288 crate::types::DataType::Int32 => core::ptr::read_unaligned(field_ptr as *const i32) as u64,
1289 crate::types::DataType::Int64 => core::ptr::read_unaligned(field_ptr as *const i64) as u64,
1290 _ => continue, }
1292 }
1293 };
1294
1295 if timestamp >= start_time && timestamp <= end_time {
1296 let value = self.get_field(record_ptr, value_field_index)?;
1298 let numeric_value = match self.def.fields[value_field_index].data_type {
1299 crate::types::DataType::UInt8 => value.u8 as f64,
1300 crate::types::DataType::UInt16 => value.u16 as f64,
1301 crate::types::DataType::UInt32 => value.u32 as f64,
1302 crate::types::DataType::UInt64 => value.u64 as f64,
1303 crate::types::DataType::Float32 => value.float32 as f64,
1304 crate::types::DataType::Float64 => value.float64,
1305 _ => return Err(RemDbError::TypeMismatch),
1306 };
1307
1308 if let Some(current_min) = min_value {
1309 if numeric_value < current_min {
1310 min_value = Some(numeric_value);
1311 }
1312 } else {
1313 min_value = Some(numeric_value);
1314 }
1315 }
1316 }
1317
1318 min_value.ok_or(RemDbError::RecordNotFound)
1319 }
1320
1321 unsafe fn read_timestamp_value(&self, record_ptr: *const u8, time_field_index: usize) -> Option<u64> {
1323 let time_field = &self.def.fields[time_field_index];
1324 match time_field.data_type {
1325 crate::types::DataType::UInt64 => Some(
1326 core::ptr::read_unaligned(
1327 record_ptr.add(time_field.offset) as *const u64
1328 )
1329 ),
1330 crate::types::DataType::Timestamp => Some(
1331 core::ptr::read_unaligned(
1332 record_ptr.add(time_field.offset) as *const u64
1333 )
1334 ),
1335 crate::types::DataType::UInt8 => Some(
1336 core::ptr::read_unaligned(
1337 record_ptr.add(time_field.offset) as *const u8
1338 ) as u64
1339 ),
1340 crate::types::DataType::UInt16 => Some(
1341 core::ptr::read_unaligned(
1342 record_ptr.add(time_field.offset) as *const u16
1343 ) as u64
1344 ),
1345 crate::types::DataType::UInt32 => Some(
1346 core::ptr::read_unaligned(
1347 record_ptr.add(time_field.offset) as *const u32
1348 ) as u64
1349 ),
1350 crate::types::DataType::Int8 => Some(
1351 core::ptr::read_unaligned(
1352 record_ptr.add(time_field.offset) as *const i8
1353 ) as u64
1354 ),
1355 crate::types::DataType::Int16 => Some(
1356 core::ptr::read_unaligned(
1357 record_ptr.add(time_field.offset) as *const i16
1358 ) as u64
1359 ),
1360 crate::types::DataType::Int32 => Some(
1361 core::ptr::read_unaligned(
1362 record_ptr.add(time_field.offset) as *const i32
1363 ) as u64
1364 ),
1365 crate::types::DataType::Int64 => Some(
1366 core::ptr::read_unaligned(
1367 record_ptr.add(time_field.offset) as *const i64
1368 ) as u64
1369 ),
1370 _ => None, }
1372 }
1373
1374 pub unsafe fn aggregate_max(
1377 &self,
1378 time_field_index: usize,
1379 value_field_index: usize,
1380 start_time: u64,
1381 end_time: u64
1382 ) -> Result<f64> {
1383 if time_field_index >= self.def.fields.len() || value_field_index >= self.def.fields.len() {
1385 return Err(RemDbError::FieldNotFound);
1386 }
1387
1388 let mut max_value: Option<f64> = None;
1389
1390 for i in 0..self.def.max_records {
1392 let status_ptr = self.status_array.as_ptr().add(i);
1393 if (*status_ptr).status != RecordStatus::Used {
1394 continue;
1395 }
1396
1397 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1398
1399 let Some(timestamp) = self.read_timestamp_value(record_ptr, time_field_index) else {
1401 continue;
1402 };
1403
1404 if timestamp >= start_time && timestamp <= end_time {
1405 let value = self.get_field(record_ptr, value_field_index)?;
1407 let numeric_value = match self.def.fields[value_field_index].data_type {
1408 crate::types::DataType::UInt8 => value.u8 as f64,
1409 crate::types::DataType::UInt16 => value.u16 as f64,
1410 crate::types::DataType::UInt32 => value.u32 as f64,
1411 crate::types::DataType::UInt64 => value.u64 as f64,
1412 crate::types::DataType::Float32 => value.float32 as f64,
1413 crate::types::DataType::Float64 => value.float64,
1414 _ => return Err(RemDbError::TypeMismatch),
1415 };
1416
1417 if let Some(current_max) = max_value {
1418 if numeric_value > current_max {
1419 max_value = Some(numeric_value);
1420 }
1421 } else {
1422 max_value = Some(numeric_value);
1423 }
1424 }
1425 }
1426
1427 max_value.ok_or(RemDbError::RecordNotFound)
1428 }
1429
1430 pub unsafe fn get_latest_records(
1433 &self,
1434 time_field_index: usize,
1435 count: usize,
1436 dest: *mut u8
1437 ) -> Result<usize> {
1438 if time_field_index >= self.def.fields.len() {
1440 return Err(RemDbError::FieldNotFound);
1441 }
1442
1443 if dest.is_null() {
1445 return Err(RemDbError::UnsupportedOperation);
1446 }
1447
1448 if self.record_count == 0 {
1450 return Ok(0);
1451 }
1452
1453 let mut record_times = [(0usize, 0u64); 1024]; let mut record_count = 0;
1457
1458 for i in 0..self.def.max_records {
1460 let status_ptr = self.status_array.as_ptr().add(i);
1461 if (*status_ptr).status != RecordStatus::Used {
1462 continue;
1463 }
1464
1465 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1466
1467 if let Some(timestamp) = self.read_timestamp_value(record_ptr, time_field_index) {
1469 record_times[record_count] = (i, timestamp);
1470 record_count += 1;
1471 };
1472 }
1473
1474 for i in 0..record_count {
1477 for j in i+1..record_count {
1478 if record_times[i].1 < record_times[j].1 {
1479 record_times.swap(i, j);
1480 }
1481 }
1482 }
1483
1484 let actual_count = core::cmp::min(count, record_count);
1486 for i in 0..actual_count {
1487 let (record_id, _) = record_times[i];
1488 let src_ptr = self.data_start.as_ptr().add(record_id * self.record_size);
1489 let dest_ptr = dest.add(i * self.record_size);
1490 memcpy(dest_ptr, src_ptr, self.record_size);
1491 }
1492
1493 Ok(actual_count)
1494 }
1495
1496 pub unsafe fn get_records_in_time_window(
1499 &self,
1500 time_field_index: usize,
1501 start_time: u64,
1502 end_time: u64,
1503 dest: *mut u8,
1504 max_records: usize
1505 ) -> Result<usize> {
1506 if time_field_index >= self.def.fields.len() {
1508 return Err(RemDbError::FieldNotFound);
1509 }
1510
1511 if dest.is_null() {
1513 return Err(RemDbError::UnsupportedOperation);
1514 }
1515
1516 if self.record_count == 0 {
1518 return Ok(0);
1519 }
1520
1521 let mut matched_records = [(0usize, 0u64); 1024]; let mut match_count = 0;
1524
1525 for i in 0..self.def.max_records {
1527 let status_ptr = self.status_array.as_ptr().add(i);
1528 if (*status_ptr).status != RecordStatus::Used {
1529 continue;
1530 }
1531
1532 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1533
1534 if let Some(timestamp) = self.read_timestamp_value(record_ptr, time_field_index) {
1536 if timestamp >= start_time && timestamp <= end_time {
1537 matched_records[match_count] = (i, timestamp);
1538 match_count += 1;
1539 }
1540 };
1541 }
1542
1543 for i in 0..match_count {
1546 for j in i+1..match_count {
1547 if matched_records[i].1 > matched_records[j].1 {
1548 matched_records.swap(i, j);
1549 }
1550 }
1551 }
1552
1553 let actual_count = core::cmp::min(max_records, match_count);
1555 for i in 0..actual_count {
1556 let (record_id, _) = matched_records[i];
1557 let src_ptr = self.data_start.as_ptr().add(record_id * self.record_size);
1558 let dest_ptr = dest.add(i * self.record_size);
1559 memcpy(dest_ptr, src_ptr, self.record_size);
1560 }
1561
1562 Ok(actual_count)
1563 }
1564
1565 #[cfg(feature = "std")]
1568 pub unsafe fn get_aggregate_in_time_window(
1569 &self,
1570 time_field_index: usize,
1571 value_field_index: usize,
1572 start_time: u64,
1573 end_time: u64,
1574 window_size: u64
1575 ) -> Result<Vec<(u64, f64, f64, f64, f64, usize)>> {
1576 if time_field_index >= self.def.fields.len() || value_field_index >= self.def.fields.len() {
1578 return Err(RemDbError::FieldNotFound);
1579 }
1580
1581 let time_field = &self.def.fields[time_field_index];
1582 let value_field = &self.def.fields[value_field_index];
1583
1584 if time_field.data_type != crate::types::DataType::Timestamp &&
1586 time_field.data_type != crate::types::DataType::UInt64 {
1587 return Err(RemDbError::TypeMismatch);
1588 }
1589
1590 use alloc::collections::BTreeMap;
1592 let mut window_aggregates: BTreeMap<u64, (f64, f64, f64, f64, usize)> = BTreeMap::new();
1593
1594 for i in 0..self.def.max_records {
1596 let status_ptr = self.status_array.as_ptr().add(i);
1597 if (*status_ptr).status != RecordStatus::Used {
1598 continue;
1599 }
1600
1601 let record_ptr = self.data_start.as_ptr().add(i * self.record_size);
1602
1603 let Some(time_value) = self.read_timestamp_value(record_ptr, time_field_index) else {
1605 continue;
1606 };
1607
1608 if time_value >= start_time && time_value <= end_time {
1609 let value = self.get_field(record_ptr, value_field_index)?;
1611 let numeric_value = match value_field.data_type {
1612 crate::types::DataType::UInt8 => value.u8 as f64,
1613 crate::types::DataType::UInt16 => value.u16 as f64,
1614 crate::types::DataType::UInt32 => value.u32 as f64,
1615 crate::types::DataType::UInt64 => value.u64 as f64,
1616 crate::types::DataType::Float32 => value.float32 as f64,
1617 crate::types::DataType::Float64 => value.float64,
1618 _ => return Err(RemDbError::TypeMismatch),
1619 };
1620
1621 let window_key = time_value - (time_value % window_size);
1623
1624 let entry = window_aggregates.entry(window_key).or_insert((0.0, numeric_value, numeric_value, 0.0, 0));
1626 entry.0 += numeric_value; if numeric_value < entry.1 { entry.1 = numeric_value; } if numeric_value > entry.2 { entry.2 = numeric_value; } entry.3 = numeric_value; entry.4 += 1; }
1632 }
1633
1634 let mut result = Vec::with_capacity(window_aggregates.len());
1636 for (window_start, (sum, min, max, last, count)) in window_aggregates {
1637 let avg = if count > 0 { sum / count as f64 } else { 0.0 };
1638 result.push((window_start, sum, avg, min, max, count));
1639 }
1640
1641 Ok(result)
1642 }
1643
1644 #[cfg(not(feature = "std"))]
1646 pub unsafe fn get_aggregate_in_time_window(
1647 &self,
1648 _time_field_index: usize,
1649 _value_field_index: usize,
1650 _start_time: u64,
1651 _end_time: u64,
1652 _window_size: u64
1653 ) -> Result<()> {
1654 Err(RemDbError::UnsupportedOperation)
1656 }
1657}
1658
1659#[macro_export]
1661macro_rules! defer {
1662 ($($code:tt)*) => {
1663 let _defer = $crate::table::Defer::new(|| { $($code)* });
1664 };
1665}
1666
1667pub struct Defer<F: FnMut()>(Option<F>);
1669
1670impl<F: FnMut()> Defer<F> {
1671 pub fn new(f: F) -> Self {
1673 Defer(Some(f))
1674 }
1675}
1676
1677impl<F: FnMut()> Drop for Defer<F> {
1678 fn drop(&mut self) {
1679 if let Some(mut f) = self.0.take() {
1680 f();
1681 }
1682 }
1683}