1use std::fs::{self, File};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::*;
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow::record_batch::RecordBatch;
13use chrono::{DateTime, NaiveDate};
14use dashmap::DashMap;
15use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
16use parquet::arrow::ArrowWriter;
17use parquet::basic::Compression;
18use parquet::file::properties::WriterProperties;
19use tokio::sync::Mutex as TokioMutex;
20
21use crate::db::models::{
22 Activity, DailyHealth, PerformanceMetrics, Profile, TrackPoint, WeightEntry,
23};
24use crate::error::{GarminError, Result};
25
26use super::partitions::EntityType;
27
28#[derive(Clone)]
33pub struct ParquetStore {
34 base_path: PathBuf,
35 partition_locks: Arc<DashMap<String, Arc<TokioMutex<()>>>>,
37}
38
39impl ParquetStore {
40 pub fn new(base_path: impl Into<PathBuf>) -> Self {
42 Self {
43 base_path: base_path.into(),
44 partition_locks: Arc::new(DashMap::new()),
45 }
46 }
47
48 fn get_partition_lock(&self, partition_key: &str) -> Arc<TokioMutex<()>> {
50 self.partition_locks
51 .entry(partition_key.to_string())
52 .or_insert_with(|| Arc::new(TokioMutex::new(())))
53 .clone()
54 }
55
56 pub fn has_daily_health(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
58 let key = EntityType::DailyHealth.partition_key(date);
59 let path = self.partition_path(EntityType::DailyHealth, &key);
60 if !path.exists() {
61 return Ok(false);
62 }
63 let records = self.read_daily_health_from_path(&path)?;
64 Ok(records
65 .iter()
66 .any(|r| r.profile_id == profile_id && r.date == date))
67 }
68
69 pub fn has_performance_metrics(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
71 let key = EntityType::PerformanceMetrics.partition_key(date);
72 let path = self.partition_path(EntityType::PerformanceMetrics, &key);
73 if !path.exists() {
74 return Ok(false);
75 }
76 let records = self.read_performance_metrics_from_path(&path)?;
77 Ok(records
78 .iter()
79 .any(|r| r.profile_id == profile_id && r.date == date))
80 }
81
82 pub fn has_track_points(&self, activity_id: i64, date: NaiveDate) -> Result<bool> {
84 let key = EntityType::TrackPoints.partition_key(date);
85 let path = self.partition_path(EntityType::TrackPoints, &key);
86 if !path.exists() {
87 return Ok(false);
88 }
89 let records = self.read_track_points_from_path(&path)?;
90 Ok(records.iter().any(|r| r.activity_id == activity_id))
91 }
92
93 pub fn partition_path(&self, entity: EntityType, partition_key: &str) -> PathBuf {
95 match entity {
96 EntityType::Profiles => self.base_path.join("profiles.parquet"),
97 _ => self
98 .base_path
99 .join(entity.dir_name())
100 .join(format!("{}.parquet", partition_key)),
101 }
102 }
103
104 fn ensure_dir(&self, entity: EntityType) -> Result<()> {
106 if entity != EntityType::Profiles {
107 let dir = self.base_path.join(entity.dir_name());
108 fs::create_dir_all(&dir).map_err(|e| {
109 GarminError::Database(format!("Failed to create directory {:?}: {}", dir, e))
110 })?;
111 } else {
112 fs::create_dir_all(&self.base_path).map_err(|e| {
113 GarminError::Database(format!(
114 "Failed to create directory {:?}: {}",
115 self.base_path, e
116 ))
117 })?;
118 }
119 Ok(())
120 }
121
122 fn write_batch(&self, path: &Path, batch: &RecordBatch) -> Result<()> {
124 let temp_path = path.with_extension("parquet.tmp");
126
127 if let Some(parent) = path.parent() {
129 fs::create_dir_all(parent)
130 .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
131 }
132
133 let file = File::create(&temp_path)
134 .map_err(|e| GarminError::Database(format!("Failed to create temp file: {}", e)))?;
135
136 let props = WriterProperties::builder()
137 .set_compression(Compression::ZSTD(Default::default()))
138 .build();
139
140 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
141 GarminError::Database(format!("Failed to create Parquet writer: {}", e))
142 })?;
143
144 writer
145 .write(batch)
146 .map_err(|e| GarminError::Database(format!("Failed to write batch: {}", e)))?;
147
148 writer
149 .close()
150 .map_err(|e| GarminError::Database(format!("Failed to close writer: {}", e)))?;
151
152 fs::rename(&temp_path, path)
154 .map_err(|e| GarminError::Database(format!("Failed to rename temp file: {}", e)))?;
155
156 Ok(())
157 }
158
159 fn read_batches(&self, path: &Path) -> Result<Vec<RecordBatch>> {
161 if !path.exists() {
162 return Ok(Vec::new());
163 }
164
165 let file = File::open(path)
166 .map_err(|e| GarminError::Database(format!("Failed to open file: {}", e)))?;
167
168 let reader = ParquetRecordBatchReaderBuilder::try_new(file)
169 .map_err(|e| GarminError::Database(format!("Failed to create reader: {}", e)))?
170 .build()
171 .map_err(|e| GarminError::Database(format!("Failed to build reader: {}", e)))?;
172
173 reader
174 .collect::<std::result::Result<Vec<_>, _>>()
175 .map_err(|e| GarminError::Database(format!("Failed to read batches: {}", e)))
176 }
177
178 pub fn write_activities(&self, activities: &[Activity]) -> Result<()> {
184 self.ensure_dir(EntityType::Activities)?;
185
186 let mut partitions: std::collections::HashMap<String, Vec<&Activity>> =
188 std::collections::HashMap::new();
189
190 for activity in activities {
191 if let Some(start_time) = activity.start_time_local {
192 let key = EntityType::Activities.partition_key(start_time.date_naive());
193 partitions.entry(key).or_default().push(activity);
194 }
195 }
196
197 for (key, partition_activities) in partitions {
199 let path = self.partition_path(EntityType::Activities, &key);
200 let batch = Self::activities_to_batch(partition_activities)?;
201 self.write_batch(&path, &batch)?;
202 }
203
204 Ok(())
205 }
206
207 pub fn upsert_activities(&self, activities: &[Activity]) -> Result<()> {
209 self.ensure_dir(EntityType::Activities)?;
210
211 let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
213 std::collections::HashMap::new();
214
215 for activity in activities {
216 if let Some(start_time) = activity.start_time_local {
217 let key = EntityType::Activities.partition_key(start_time.date_naive());
218 partitions.entry(key).or_default().push(activity.clone());
219 }
220 }
221
222 for (key, mut new_activities) in partitions {
224 let path = self.partition_path(EntityType::Activities, &key);
225
226 let mut existing = self.read_activities_from_path(&path)?;
228
229 let new_ids: std::collections::HashSet<i64> =
231 new_activities.iter().map(|a| a.activity_id).collect();
232
233 existing.retain(|a| !new_ids.contains(&a.activity_id));
235
236 existing.append(&mut new_activities);
238
239 existing.sort_by_key(|a| a.activity_id);
241
242 let refs: Vec<&Activity> = existing.iter().collect();
244 let batch = Self::activities_to_batch(refs)?;
245 self.write_batch(&path, &batch)?;
246 }
247
248 Ok(())
249 }
250
251 pub async fn upsert_activities_async(&self, activities: &[Activity]) -> Result<()> {
253 self.ensure_dir(EntityType::Activities)?;
254
255 let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
257 std::collections::HashMap::new();
258
259 for activity in activities {
260 if let Some(start_time) = activity.start_time_local {
261 let key = EntityType::Activities.partition_key(start_time.date_naive());
262 partitions.entry(key).or_default().push(activity.clone());
263 }
264 }
265
266 for (key, mut new_activities) in partitions {
268 let lock = self.get_partition_lock(&key);
270 let _guard = lock.lock().await;
271
272 let path = self.partition_path(EntityType::Activities, &key);
273
274 let mut existing = self.read_activities_from_path(&path)?;
276
277 let new_ids: std::collections::HashSet<i64> =
279 new_activities.iter().map(|a| a.activity_id).collect();
280
281 existing.retain(|a| !new_ids.contains(&a.activity_id));
283
284 existing.append(&mut new_activities);
286
287 existing.sort_by_key(|a| a.activity_id);
289
290 let refs: Vec<&Activity> = existing.iter().collect();
292 let batch = Self::activities_to_batch(refs)?;
293 self.write_batch(&path, &batch)?;
294 }
295
296 Ok(())
297 }
298
299 fn read_activities_from_path(&self, path: &Path) -> Result<Vec<Activity>> {
300 let batches = self.read_batches(path)?;
301 let mut activities = Vec::new();
302
303 for batch in batches {
304 activities.extend(Self::batch_to_activities(&batch)?);
305 }
306
307 Ok(activities)
308 }
309
310 fn activities_to_batch(activities: Vec<&Activity>) -> Result<RecordBatch> {
311 let activity_id: Int64Array = activities.iter().map(|a| a.activity_id).collect();
312 let profile_id: Int32Array = activities.iter().map(|a| a.profile_id).collect();
313 let activity_name: StringArray = activities
314 .iter()
315 .map(|a| a.activity_name.as_deref())
316 .collect();
317 let activity_type: StringArray = activities
318 .iter()
319 .map(|a| a.activity_type.as_deref())
320 .collect();
321 let start_time_local: TimestampMicrosecondArray = activities
322 .iter()
323 .map(|a| a.start_time_local.map(|t| t.timestamp_micros()))
324 .collect();
325 let start_time_gmt: TimestampMicrosecondArray = activities
326 .iter()
327 .map(|a| a.start_time_gmt.map(|t| t.timestamp_micros()))
328 .collect();
329 let duration_sec: Float64Array = activities.iter().map(|a| a.duration_sec).collect();
330 let distance_m: Float64Array = activities.iter().map(|a| a.distance_m).collect();
331 let calories: Int32Array = activities.iter().map(|a| a.calories).collect();
332 let avg_hr: Int32Array = activities.iter().map(|a| a.avg_hr).collect();
333 let max_hr: Int32Array = activities.iter().map(|a| a.max_hr).collect();
334 let avg_speed: Float64Array = activities.iter().map(|a| a.avg_speed).collect();
335 let max_speed: Float64Array = activities.iter().map(|a| a.max_speed).collect();
336 let elevation_gain: Float64Array = activities.iter().map(|a| a.elevation_gain).collect();
337 let elevation_loss: Float64Array = activities.iter().map(|a| a.elevation_loss).collect();
338 let avg_cadence: Float64Array = activities.iter().map(|a| a.avg_cadence).collect();
339 let avg_power: Int32Array = activities.iter().map(|a| a.avg_power).collect();
340 let normalized_power: Int32Array = activities.iter().map(|a| a.normalized_power).collect();
341 let training_effect: Float64Array = activities.iter().map(|a| a.training_effect).collect();
342 let training_load: Float64Array = activities.iter().map(|a| a.training_load).collect();
343 let start_lat: Float64Array = activities.iter().map(|a| a.start_lat).collect();
344 let start_lon: Float64Array = activities.iter().map(|a| a.start_lon).collect();
345 let end_lat: Float64Array = activities.iter().map(|a| a.end_lat).collect();
346 let end_lon: Float64Array = activities.iter().map(|a| a.end_lon).collect();
347 let ground_contact_time: Float64Array =
348 activities.iter().map(|a| a.ground_contact_time).collect();
349 let vertical_oscillation: Float64Array =
350 activities.iter().map(|a| a.vertical_oscillation).collect();
351 let stride_length: Float64Array = activities.iter().map(|a| a.stride_length).collect();
352 let location_name: StringArray = activities
353 .iter()
354 .map(|a| a.location_name.as_deref())
355 .collect();
356 let raw_json: StringArray = activities
357 .iter()
358 .map(|a| a.raw_json.as_ref().map(|j| j.to_string()))
359 .collect();
360
361 let schema = Arc::new(Schema::new(vec![
362 Field::new("activity_id", DataType::Int64, false),
363 Field::new("profile_id", DataType::Int32, false),
364 Field::new("activity_name", DataType::Utf8, true),
365 Field::new("activity_type", DataType::Utf8, true),
366 Field::new(
367 "start_time_local",
368 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
369 true,
370 ),
371 Field::new(
372 "start_time_gmt",
373 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
374 true,
375 ),
376 Field::new("duration_sec", DataType::Float64, true),
377 Field::new("distance_m", DataType::Float64, true),
378 Field::new("calories", DataType::Int32, true),
379 Field::new("avg_hr", DataType::Int32, true),
380 Field::new("max_hr", DataType::Int32, true),
381 Field::new("avg_speed", DataType::Float64, true),
382 Field::new("max_speed", DataType::Float64, true),
383 Field::new("elevation_gain", DataType::Float64, true),
384 Field::new("elevation_loss", DataType::Float64, true),
385 Field::new("avg_cadence", DataType::Float64, true),
386 Field::new("avg_power", DataType::Int32, true),
387 Field::new("normalized_power", DataType::Int32, true),
388 Field::new("training_effect", DataType::Float64, true),
389 Field::new("training_load", DataType::Float64, true),
390 Field::new("start_lat", DataType::Float64, true),
391 Field::new("start_lon", DataType::Float64, true),
392 Field::new("end_lat", DataType::Float64, true),
393 Field::new("end_lon", DataType::Float64, true),
394 Field::new("ground_contact_time", DataType::Float64, true),
395 Field::new("vertical_oscillation", DataType::Float64, true),
396 Field::new("stride_length", DataType::Float64, true),
397 Field::new("location_name", DataType::Utf8, true),
398 Field::new("raw_json", DataType::Utf8, true),
399 ]));
400
401 RecordBatch::try_new(
402 schema,
403 vec![
404 Arc::new(activity_id),
405 Arc::new(profile_id),
406 Arc::new(activity_name),
407 Arc::new(activity_type),
408 Arc::new(start_time_local),
409 Arc::new(start_time_gmt),
410 Arc::new(duration_sec),
411 Arc::new(distance_m),
412 Arc::new(calories),
413 Arc::new(avg_hr),
414 Arc::new(max_hr),
415 Arc::new(avg_speed),
416 Arc::new(max_speed),
417 Arc::new(elevation_gain),
418 Arc::new(elevation_loss),
419 Arc::new(avg_cadence),
420 Arc::new(avg_power),
421 Arc::new(normalized_power),
422 Arc::new(training_effect),
423 Arc::new(training_load),
424 Arc::new(start_lat),
425 Arc::new(start_lon),
426 Arc::new(end_lat),
427 Arc::new(end_lon),
428 Arc::new(ground_contact_time),
429 Arc::new(vertical_oscillation),
430 Arc::new(stride_length),
431 Arc::new(location_name),
432 Arc::new(raw_json),
433 ],
434 )
435 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
436 }
437
438 fn batch_to_activities(batch: &RecordBatch) -> Result<Vec<Activity>> {
439 let len = batch.num_rows();
440 let mut activities = Vec::with_capacity(len);
441
442 let activity_id = batch
443 .column(0)
444 .as_any()
445 .downcast_ref::<Int64Array>()
446 .unwrap();
447 let profile_id = batch
448 .column(1)
449 .as_any()
450 .downcast_ref::<Int32Array>()
451 .unwrap();
452 let activity_name = batch
453 .column(2)
454 .as_any()
455 .downcast_ref::<StringArray>()
456 .unwrap();
457 let activity_type = batch
458 .column(3)
459 .as_any()
460 .downcast_ref::<StringArray>()
461 .unwrap();
462 let start_time_local = batch
463 .column(4)
464 .as_any()
465 .downcast_ref::<TimestampMicrosecondArray>()
466 .unwrap();
467 let start_time_gmt = batch
468 .column(5)
469 .as_any()
470 .downcast_ref::<TimestampMicrosecondArray>()
471 .unwrap();
472 let duration_sec = batch
473 .column(6)
474 .as_any()
475 .downcast_ref::<Float64Array>()
476 .unwrap();
477 let distance_m = batch
478 .column(7)
479 .as_any()
480 .downcast_ref::<Float64Array>()
481 .unwrap();
482 let calories = batch
483 .column(8)
484 .as_any()
485 .downcast_ref::<Int32Array>()
486 .unwrap();
487 let avg_hr = batch
488 .column(9)
489 .as_any()
490 .downcast_ref::<Int32Array>()
491 .unwrap();
492 let max_hr = batch
493 .column(10)
494 .as_any()
495 .downcast_ref::<Int32Array>()
496 .unwrap();
497 let avg_speed = batch
498 .column(11)
499 .as_any()
500 .downcast_ref::<Float64Array>()
501 .unwrap();
502 let max_speed = batch
503 .column(12)
504 .as_any()
505 .downcast_ref::<Float64Array>()
506 .unwrap();
507 let elevation_gain = batch
508 .column(13)
509 .as_any()
510 .downcast_ref::<Float64Array>()
511 .unwrap();
512 let elevation_loss = batch
513 .column(14)
514 .as_any()
515 .downcast_ref::<Float64Array>()
516 .unwrap();
517 let avg_cadence = batch
518 .column(15)
519 .as_any()
520 .downcast_ref::<Float64Array>()
521 .unwrap();
522 let avg_power = batch
523 .column(16)
524 .as_any()
525 .downcast_ref::<Int32Array>()
526 .unwrap();
527 let normalized_power = batch
528 .column(17)
529 .as_any()
530 .downcast_ref::<Int32Array>()
531 .unwrap();
532 let training_effect = batch
533 .column(18)
534 .as_any()
535 .downcast_ref::<Float64Array>()
536 .unwrap();
537 let training_load = batch
538 .column(19)
539 .as_any()
540 .downcast_ref::<Float64Array>()
541 .unwrap();
542 let start_lat = batch
543 .column(20)
544 .as_any()
545 .downcast_ref::<Float64Array>()
546 .unwrap();
547 let start_lon = batch
548 .column(21)
549 .as_any()
550 .downcast_ref::<Float64Array>()
551 .unwrap();
552 let end_lat = batch
553 .column(22)
554 .as_any()
555 .downcast_ref::<Float64Array>()
556 .unwrap();
557 let end_lon = batch
558 .column(23)
559 .as_any()
560 .downcast_ref::<Float64Array>()
561 .unwrap();
562 let ground_contact_time = batch
563 .column(24)
564 .as_any()
565 .downcast_ref::<Float64Array>()
566 .unwrap();
567 let vertical_oscillation = batch
568 .column(25)
569 .as_any()
570 .downcast_ref::<Float64Array>()
571 .unwrap();
572 let stride_length = batch
573 .column(26)
574 .as_any()
575 .downcast_ref::<Float64Array>()
576 .unwrap();
577 let location_name = batch
578 .column(27)
579 .as_any()
580 .downcast_ref::<StringArray>()
581 .unwrap();
582 let raw_json = batch
583 .column(28)
584 .as_any()
585 .downcast_ref::<StringArray>()
586 .unwrap();
587
588 for i in 0..len {
589 activities.push(Activity {
590 activity_id: activity_id.value(i),
591 profile_id: profile_id.value(i),
592 activity_name: activity_name
593 .is_valid(i)
594 .then(|| activity_name.value(i).to_string()),
595 activity_type: activity_type
596 .is_valid(i)
597 .then(|| activity_type.value(i).to_string()),
598 start_time_local: start_time_local.is_valid(i).then(|| {
599 DateTime::from_timestamp_micros(start_time_local.value(i)).unwrap_or_default()
600 }),
601 start_time_gmt: start_time_gmt.is_valid(i).then(|| {
602 DateTime::from_timestamp_micros(start_time_gmt.value(i)).unwrap_or_default()
603 }),
604 duration_sec: duration_sec.is_valid(i).then(|| duration_sec.value(i)),
605 distance_m: distance_m.is_valid(i).then(|| distance_m.value(i)),
606 calories: calories.is_valid(i).then(|| calories.value(i)),
607 avg_hr: avg_hr.is_valid(i).then(|| avg_hr.value(i)),
608 max_hr: max_hr.is_valid(i).then(|| max_hr.value(i)),
609 avg_speed: avg_speed.is_valid(i).then(|| avg_speed.value(i)),
610 max_speed: max_speed.is_valid(i).then(|| max_speed.value(i)),
611 elevation_gain: elevation_gain.is_valid(i).then(|| elevation_gain.value(i)),
612 elevation_loss: elevation_loss.is_valid(i).then(|| elevation_loss.value(i)),
613 avg_cadence: avg_cadence.is_valid(i).then(|| avg_cadence.value(i)),
614 avg_power: avg_power.is_valid(i).then(|| avg_power.value(i)),
615 normalized_power: normalized_power
616 .is_valid(i)
617 .then(|| normalized_power.value(i)),
618 training_effect: training_effect
619 .is_valid(i)
620 .then(|| training_effect.value(i)),
621 training_load: training_load.is_valid(i).then(|| training_load.value(i)),
622 start_lat: start_lat.is_valid(i).then(|| start_lat.value(i)),
623 start_lon: start_lon.is_valid(i).then(|| start_lon.value(i)),
624 end_lat: end_lat.is_valid(i).then(|| end_lat.value(i)),
625 end_lon: end_lon.is_valid(i).then(|| end_lon.value(i)),
626 ground_contact_time: ground_contact_time
627 .is_valid(i)
628 .then(|| ground_contact_time.value(i)),
629 vertical_oscillation: vertical_oscillation
630 .is_valid(i)
631 .then(|| vertical_oscillation.value(i)),
632 stride_length: stride_length.is_valid(i).then(|| stride_length.value(i)),
633 location_name: location_name
634 .is_valid(i)
635 .then(|| location_name.value(i).to_string()),
636 raw_json: raw_json
637 .is_valid(i)
638 .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
639 });
640 }
641
642 Ok(activities)
643 }
644
645 pub fn upsert_daily_health(&self, records: &[DailyHealth]) -> Result<()> {
651 self.ensure_dir(EntityType::DailyHealth)?;
652
653 let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
655 std::collections::HashMap::new();
656
657 for record in records {
658 let key = EntityType::DailyHealth.partition_key(record.date);
659 partitions.entry(key).or_default().push(record.clone());
660 }
661
662 for (key, mut new_records) in partitions {
664 let path = self.partition_path(EntityType::DailyHealth, &key);
665
666 let mut existing = self.read_daily_health_from_path(&path)?;
668
669 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
671 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
672
673 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
675
676 existing.append(&mut new_records);
678
679 existing.sort_by_key(|r| (r.profile_id, r.date));
681
682 let refs: Vec<&DailyHealth> = existing.iter().collect();
684 let batch = Self::daily_health_to_batch(refs)?;
685 self.write_batch(&path, &batch)?;
686 }
687
688 Ok(())
689 }
690
691 pub async fn upsert_daily_health_async(&self, records: &[DailyHealth]) -> Result<()> {
693 self.ensure_dir(EntityType::DailyHealth)?;
694
695 let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
697 std::collections::HashMap::new();
698
699 for record in records {
700 let key = EntityType::DailyHealth.partition_key(record.date);
701 partitions.entry(key).or_default().push(record.clone());
702 }
703
704 for (key, mut new_records) in partitions {
706 let lock = self.get_partition_lock(&key);
708 let _guard = lock.lock().await;
709
710 let path = self.partition_path(EntityType::DailyHealth, &key);
711
712 let mut existing = self.read_daily_health_from_path(&path)?;
714
715 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
717 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
718
719 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
721
722 existing.append(&mut new_records);
724
725 existing.sort_by_key(|r| (r.profile_id, r.date));
727
728 let refs: Vec<&DailyHealth> = existing.iter().collect();
730 let batch = Self::daily_health_to_batch(refs)?;
731 self.write_batch(&path, &batch)?;
732 }
733
734 Ok(())
735 }
736
737 fn read_daily_health_from_path(&self, path: &Path) -> Result<Vec<DailyHealth>> {
738 let batches = self.read_batches(path)?;
739 let mut records = Vec::new();
740
741 for batch in batches {
742 records.extend(Self::batch_to_daily_health(&batch)?);
743 }
744
745 Ok(records)
746 }
747
748 fn daily_health_to_batch(records: Vec<&DailyHealth>) -> Result<RecordBatch> {
749 let id: Int64Array = records.iter().map(|r| r.id).collect();
750 let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
751 let date: Date32Array = records
752 .iter()
753 .map(|r| {
754 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
755 Some((r.date - epoch).num_days() as i32)
756 })
757 .collect();
758 let steps: Int32Array = records.iter().map(|r| r.steps).collect();
759 let step_goal: Int32Array = records.iter().map(|r| r.step_goal).collect();
760 let total_calories: Int32Array = records.iter().map(|r| r.total_calories).collect();
761 let active_calories: Int32Array = records.iter().map(|r| r.active_calories).collect();
762 let bmr_calories: Int32Array = records.iter().map(|r| r.bmr_calories).collect();
763 let resting_hr: Int32Array = records.iter().map(|r| r.resting_hr).collect();
764 let sleep_seconds: Int32Array = records.iter().map(|r| r.sleep_seconds).collect();
765 let deep_sleep_seconds: Int32Array = records.iter().map(|r| r.deep_sleep_seconds).collect();
766 let light_sleep_seconds: Int32Array =
767 records.iter().map(|r| r.light_sleep_seconds).collect();
768 let rem_sleep_seconds: Int32Array = records.iter().map(|r| r.rem_sleep_seconds).collect();
769 let sleep_score: Int32Array = records.iter().map(|r| r.sleep_score).collect();
770 let avg_stress: Int32Array = records.iter().map(|r| r.avg_stress).collect();
771 let max_stress: Int32Array = records.iter().map(|r| r.max_stress).collect();
772 let body_battery_start: Int32Array = records.iter().map(|r| r.body_battery_start).collect();
773 let body_battery_end: Int32Array = records.iter().map(|r| r.body_battery_end).collect();
774 let hrv_weekly_avg: Int32Array = records.iter().map(|r| r.hrv_weekly_avg).collect();
775 let hrv_last_night: Int32Array = records.iter().map(|r| r.hrv_last_night).collect();
776 let hrv_status: StringArray = records.iter().map(|r| r.hrv_status.as_deref()).collect();
777 let avg_respiration: Float64Array = records.iter().map(|r| r.avg_respiration).collect();
778 let avg_spo2: Int32Array = records.iter().map(|r| r.avg_spo2).collect();
779 let lowest_spo2: Int32Array = records.iter().map(|r| r.lowest_spo2).collect();
780 let hydration_ml: Int32Array = records.iter().map(|r| r.hydration_ml).collect();
781 let moderate_intensity_min: Int32Array =
782 records.iter().map(|r| r.moderate_intensity_min).collect();
783 let vigorous_intensity_min: Int32Array =
784 records.iter().map(|r| r.vigorous_intensity_min).collect();
785 let raw_json: StringArray = records
786 .iter()
787 .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
788 .collect();
789
790 let schema = Arc::new(Schema::new(vec![
791 Field::new("id", DataType::Int64, true),
792 Field::new("profile_id", DataType::Int32, false),
793 Field::new("date", DataType::Date32, false),
794 Field::new("steps", DataType::Int32, true),
795 Field::new("step_goal", DataType::Int32, true),
796 Field::new("total_calories", DataType::Int32, true),
797 Field::new("active_calories", DataType::Int32, true),
798 Field::new("bmr_calories", DataType::Int32, true),
799 Field::new("resting_hr", DataType::Int32, true),
800 Field::new("sleep_seconds", DataType::Int32, true),
801 Field::new("deep_sleep_seconds", DataType::Int32, true),
802 Field::new("light_sleep_seconds", DataType::Int32, true),
803 Field::new("rem_sleep_seconds", DataType::Int32, true),
804 Field::new("sleep_score", DataType::Int32, true),
805 Field::new("avg_stress", DataType::Int32, true),
806 Field::new("max_stress", DataType::Int32, true),
807 Field::new("body_battery_start", DataType::Int32, true),
808 Field::new("body_battery_end", DataType::Int32, true),
809 Field::new("hrv_weekly_avg", DataType::Int32, true),
810 Field::new("hrv_last_night", DataType::Int32, true),
811 Field::new("hrv_status", DataType::Utf8, true),
812 Field::new("avg_respiration", DataType::Float64, true),
813 Field::new("avg_spo2", DataType::Int32, true),
814 Field::new("lowest_spo2", DataType::Int32, true),
815 Field::new("hydration_ml", DataType::Int32, true),
816 Field::new("moderate_intensity_min", DataType::Int32, true),
817 Field::new("vigorous_intensity_min", DataType::Int32, true),
818 Field::new("raw_json", DataType::Utf8, true),
819 ]));
820
821 RecordBatch::try_new(
822 schema,
823 vec![
824 Arc::new(id),
825 Arc::new(profile_id),
826 Arc::new(date),
827 Arc::new(steps),
828 Arc::new(step_goal),
829 Arc::new(total_calories),
830 Arc::new(active_calories),
831 Arc::new(bmr_calories),
832 Arc::new(resting_hr),
833 Arc::new(sleep_seconds),
834 Arc::new(deep_sleep_seconds),
835 Arc::new(light_sleep_seconds),
836 Arc::new(rem_sleep_seconds),
837 Arc::new(sleep_score),
838 Arc::new(avg_stress),
839 Arc::new(max_stress),
840 Arc::new(body_battery_start),
841 Arc::new(body_battery_end),
842 Arc::new(hrv_weekly_avg),
843 Arc::new(hrv_last_night),
844 Arc::new(hrv_status),
845 Arc::new(avg_respiration),
846 Arc::new(avg_spo2),
847 Arc::new(lowest_spo2),
848 Arc::new(hydration_ml),
849 Arc::new(moderate_intensity_min),
850 Arc::new(vigorous_intensity_min),
851 Arc::new(raw_json),
852 ],
853 )
854 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
855 }
856
857 fn batch_to_daily_health(batch: &RecordBatch) -> Result<Vec<DailyHealth>> {
858 let len = batch.num_rows();
859 let mut records = Vec::with_capacity(len);
860
861 let id = batch
862 .column(0)
863 .as_any()
864 .downcast_ref::<Int64Array>()
865 .unwrap();
866 let profile_id = batch
867 .column(1)
868 .as_any()
869 .downcast_ref::<Int32Array>()
870 .unwrap();
871 let date = batch
872 .column(2)
873 .as_any()
874 .downcast_ref::<Date32Array>()
875 .unwrap();
876 let steps = batch
877 .column(3)
878 .as_any()
879 .downcast_ref::<Int32Array>()
880 .unwrap();
881 let step_goal = batch
882 .column(4)
883 .as_any()
884 .downcast_ref::<Int32Array>()
885 .unwrap();
886 let total_calories = batch
887 .column(5)
888 .as_any()
889 .downcast_ref::<Int32Array>()
890 .unwrap();
891 let active_calories = batch
892 .column(6)
893 .as_any()
894 .downcast_ref::<Int32Array>()
895 .unwrap();
896 let bmr_calories = batch
897 .column(7)
898 .as_any()
899 .downcast_ref::<Int32Array>()
900 .unwrap();
901 let resting_hr = batch
902 .column(8)
903 .as_any()
904 .downcast_ref::<Int32Array>()
905 .unwrap();
906 let sleep_seconds = batch
907 .column(9)
908 .as_any()
909 .downcast_ref::<Int32Array>()
910 .unwrap();
911 let deep_sleep_seconds = batch
912 .column(10)
913 .as_any()
914 .downcast_ref::<Int32Array>()
915 .unwrap();
916 let light_sleep_seconds = batch
917 .column(11)
918 .as_any()
919 .downcast_ref::<Int32Array>()
920 .unwrap();
921 let rem_sleep_seconds = batch
922 .column(12)
923 .as_any()
924 .downcast_ref::<Int32Array>()
925 .unwrap();
926 let sleep_score = batch
927 .column(13)
928 .as_any()
929 .downcast_ref::<Int32Array>()
930 .unwrap();
931 let avg_stress = batch
932 .column(14)
933 .as_any()
934 .downcast_ref::<Int32Array>()
935 .unwrap();
936 let max_stress = batch
937 .column(15)
938 .as_any()
939 .downcast_ref::<Int32Array>()
940 .unwrap();
941 let body_battery_start = batch
942 .column(16)
943 .as_any()
944 .downcast_ref::<Int32Array>()
945 .unwrap();
946 let body_battery_end = batch
947 .column(17)
948 .as_any()
949 .downcast_ref::<Int32Array>()
950 .unwrap();
951 let hrv_weekly_avg = batch
952 .column(18)
953 .as_any()
954 .downcast_ref::<Int32Array>()
955 .unwrap();
956 let hrv_last_night = batch
957 .column(19)
958 .as_any()
959 .downcast_ref::<Int32Array>()
960 .unwrap();
961 let hrv_status = batch
962 .column(20)
963 .as_any()
964 .downcast_ref::<StringArray>()
965 .unwrap();
966 let avg_respiration = batch
967 .column(21)
968 .as_any()
969 .downcast_ref::<Float64Array>()
970 .unwrap();
971 let avg_spo2 = batch
972 .column(22)
973 .as_any()
974 .downcast_ref::<Int32Array>()
975 .unwrap();
976 let lowest_spo2 = batch
977 .column(23)
978 .as_any()
979 .downcast_ref::<Int32Array>()
980 .unwrap();
981 let hydration_ml = batch
982 .column(24)
983 .as_any()
984 .downcast_ref::<Int32Array>()
985 .unwrap();
986 let moderate_intensity_min = batch
987 .column(25)
988 .as_any()
989 .downcast_ref::<Int32Array>()
990 .unwrap();
991 let vigorous_intensity_min = batch
992 .column(26)
993 .as_any()
994 .downcast_ref::<Int32Array>()
995 .unwrap();
996 let raw_json = batch
997 .column(27)
998 .as_any()
999 .downcast_ref::<StringArray>()
1000 .unwrap();
1001
1002 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1003
1004 for i in 0..len {
1005 records.push(DailyHealth {
1006 id: id.is_valid(i).then(|| id.value(i)),
1007 profile_id: profile_id.value(i),
1008 date: epoch + chrono::Duration::days(date.value(i) as i64),
1009 steps: steps.is_valid(i).then(|| steps.value(i)),
1010 step_goal: step_goal.is_valid(i).then(|| step_goal.value(i)),
1011 total_calories: total_calories.is_valid(i).then(|| total_calories.value(i)),
1012 active_calories: active_calories
1013 .is_valid(i)
1014 .then(|| active_calories.value(i)),
1015 bmr_calories: bmr_calories.is_valid(i).then(|| bmr_calories.value(i)),
1016 resting_hr: resting_hr.is_valid(i).then(|| resting_hr.value(i)),
1017 sleep_seconds: sleep_seconds.is_valid(i).then(|| sleep_seconds.value(i)),
1018 deep_sleep_seconds: deep_sleep_seconds
1019 .is_valid(i)
1020 .then(|| deep_sleep_seconds.value(i)),
1021 light_sleep_seconds: light_sleep_seconds
1022 .is_valid(i)
1023 .then(|| light_sleep_seconds.value(i)),
1024 rem_sleep_seconds: rem_sleep_seconds
1025 .is_valid(i)
1026 .then(|| rem_sleep_seconds.value(i)),
1027 sleep_score: sleep_score.is_valid(i).then(|| sleep_score.value(i)),
1028 avg_stress: avg_stress.is_valid(i).then(|| avg_stress.value(i)),
1029 max_stress: max_stress.is_valid(i).then(|| max_stress.value(i)),
1030 body_battery_start: body_battery_start
1031 .is_valid(i)
1032 .then(|| body_battery_start.value(i)),
1033 body_battery_end: body_battery_end
1034 .is_valid(i)
1035 .then(|| body_battery_end.value(i)),
1036 hrv_weekly_avg: hrv_weekly_avg.is_valid(i).then(|| hrv_weekly_avg.value(i)),
1037 hrv_last_night: hrv_last_night.is_valid(i).then(|| hrv_last_night.value(i)),
1038 hrv_status: hrv_status
1039 .is_valid(i)
1040 .then(|| hrv_status.value(i).to_string()),
1041 avg_respiration: avg_respiration
1042 .is_valid(i)
1043 .then(|| avg_respiration.value(i)),
1044 avg_spo2: avg_spo2.is_valid(i).then(|| avg_spo2.value(i)),
1045 lowest_spo2: lowest_spo2.is_valid(i).then(|| lowest_spo2.value(i)),
1046 hydration_ml: hydration_ml.is_valid(i).then(|| hydration_ml.value(i)),
1047 moderate_intensity_min: moderate_intensity_min
1048 .is_valid(i)
1049 .then(|| moderate_intensity_min.value(i)),
1050 vigorous_intensity_min: vigorous_intensity_min
1051 .is_valid(i)
1052 .then(|| vigorous_intensity_min.value(i)),
1053 raw_json: raw_json
1054 .is_valid(i)
1055 .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1056 });
1057 }
1058
1059 Ok(records)
1060 }
1061
1062 pub fn write_track_points(
1068 &self,
1069 activity_date: NaiveDate,
1070 points: &[TrackPoint],
1071 ) -> Result<()> {
1072 self.ensure_dir(EntityType::TrackPoints)?;
1073
1074 let key = EntityType::TrackPoints.partition_key(activity_date);
1075 let path = self.partition_path(EntityType::TrackPoints, &key);
1076
1077 let mut existing = self.read_track_points_from_path(&path)?;
1079
1080 if let Some(first) = points.first() {
1081 existing.retain(|p| p.activity_id != first.activity_id);
1082 }
1083
1084 existing.extend(points.iter().cloned());
1085
1086 existing.sort_by(|a, b| {
1088 a.activity_id
1089 .cmp(&b.activity_id)
1090 .then(a.timestamp.cmp(&b.timestamp))
1091 });
1092
1093 let refs: Vec<&TrackPoint> = existing.iter().collect();
1094 let batch = Self::track_points_to_batch(refs)?;
1095 self.write_batch(&path, &batch)?;
1096
1097 Ok(())
1098 }
1099
1100 pub async fn write_track_points_async(
1102 &self,
1103 activity_date: NaiveDate,
1104 points: &[TrackPoint],
1105 ) -> Result<()> {
1106 self.ensure_dir(EntityType::TrackPoints)?;
1107
1108 let key = EntityType::TrackPoints.partition_key(activity_date);
1109
1110 let lock = self.get_partition_lock(&key);
1112 let _guard = lock.lock().await;
1113
1114 let path = self.partition_path(EntityType::TrackPoints, &key);
1115
1116 let mut existing = self.read_track_points_from_path(&path)?;
1118
1119 if let Some(first) = points.first() {
1120 existing.retain(|p| p.activity_id != first.activity_id);
1121 }
1122
1123 existing.extend(points.iter().cloned());
1124
1125 existing.sort_by(|a, b| {
1127 a.activity_id
1128 .cmp(&b.activity_id)
1129 .then(a.timestamp.cmp(&b.timestamp))
1130 });
1131
1132 let refs: Vec<&TrackPoint> = existing.iter().collect();
1133 let batch = Self::track_points_to_batch(refs)?;
1134 self.write_batch(&path, &batch)?;
1135
1136 Ok(())
1137 }
1138
1139 fn read_track_points_from_path(&self, path: &Path) -> Result<Vec<TrackPoint>> {
1140 let batches = self.read_batches(path)?;
1141 let mut points = Vec::new();
1142
1143 for batch in batches {
1144 points.extend(Self::batch_to_track_points(&batch)?);
1145 }
1146
1147 Ok(points)
1148 }
1149
1150 fn track_points_to_batch(points: Vec<&TrackPoint>) -> Result<RecordBatch> {
1151 let id: Int64Array = points.iter().map(|p| p.id).collect();
1152 let activity_id: Int64Array = points.iter().map(|p| p.activity_id).collect();
1153 let timestamp: TimestampMicrosecondArray = points
1154 .iter()
1155 .map(|p| Some(p.timestamp.timestamp_micros()))
1156 .collect();
1157 let lat: Float64Array = points.iter().map(|p| p.lat).collect();
1158 let lon: Float64Array = points.iter().map(|p| p.lon).collect();
1159 let elevation: Float64Array = points.iter().map(|p| p.elevation).collect();
1160 let heart_rate: Int32Array = points.iter().map(|p| p.heart_rate).collect();
1161 let cadence: Int32Array = points.iter().map(|p| p.cadence).collect();
1162 let power: Int32Array = points.iter().map(|p| p.power).collect();
1163 let speed: Float64Array = points.iter().map(|p| p.speed).collect();
1164
1165 let schema = Arc::new(Schema::new(vec![
1166 Field::new("id", DataType::Int64, true),
1167 Field::new("activity_id", DataType::Int64, false),
1168 Field::new(
1169 "timestamp",
1170 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1171 false,
1172 ),
1173 Field::new("lat", DataType::Float64, true),
1174 Field::new("lon", DataType::Float64, true),
1175 Field::new("elevation", DataType::Float64, true),
1176 Field::new("heart_rate", DataType::Int32, true),
1177 Field::new("cadence", DataType::Int32, true),
1178 Field::new("power", DataType::Int32, true),
1179 Field::new("speed", DataType::Float64, true),
1180 ]));
1181
1182 RecordBatch::try_new(
1183 schema,
1184 vec![
1185 Arc::new(id),
1186 Arc::new(activity_id),
1187 Arc::new(timestamp),
1188 Arc::new(lat),
1189 Arc::new(lon),
1190 Arc::new(elevation),
1191 Arc::new(heart_rate),
1192 Arc::new(cadence),
1193 Arc::new(power),
1194 Arc::new(speed),
1195 ],
1196 )
1197 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1198 }
1199
1200 fn batch_to_track_points(batch: &RecordBatch) -> Result<Vec<TrackPoint>> {
1201 let len = batch.num_rows();
1202 let mut points = Vec::with_capacity(len);
1203
1204 let id = batch
1205 .column(0)
1206 .as_any()
1207 .downcast_ref::<Int64Array>()
1208 .unwrap();
1209 let activity_id = batch
1210 .column(1)
1211 .as_any()
1212 .downcast_ref::<Int64Array>()
1213 .unwrap();
1214 let timestamp = batch
1215 .column(2)
1216 .as_any()
1217 .downcast_ref::<TimestampMicrosecondArray>()
1218 .unwrap();
1219 let lat = batch
1220 .column(3)
1221 .as_any()
1222 .downcast_ref::<Float64Array>()
1223 .unwrap();
1224 let lon = batch
1225 .column(4)
1226 .as_any()
1227 .downcast_ref::<Float64Array>()
1228 .unwrap();
1229 let elevation = batch
1230 .column(5)
1231 .as_any()
1232 .downcast_ref::<Float64Array>()
1233 .unwrap();
1234 let heart_rate = batch
1235 .column(6)
1236 .as_any()
1237 .downcast_ref::<Int32Array>()
1238 .unwrap();
1239 let cadence = batch
1240 .column(7)
1241 .as_any()
1242 .downcast_ref::<Int32Array>()
1243 .unwrap();
1244 let power = batch
1245 .column(8)
1246 .as_any()
1247 .downcast_ref::<Int32Array>()
1248 .unwrap();
1249 let speed = batch
1250 .column(9)
1251 .as_any()
1252 .downcast_ref::<Float64Array>()
1253 .unwrap();
1254
1255 for i in 0..len {
1256 points.push(TrackPoint {
1257 id: id.is_valid(i).then(|| id.value(i)),
1258 activity_id: activity_id.value(i),
1259 timestamp: DateTime::from_timestamp_micros(timestamp.value(i)).unwrap_or_default(),
1260 lat: lat.is_valid(i).then(|| lat.value(i)),
1261 lon: lon.is_valid(i).then(|| lon.value(i)),
1262 elevation: elevation.is_valid(i).then(|| elevation.value(i)),
1263 heart_rate: heart_rate.is_valid(i).then(|| heart_rate.value(i)),
1264 cadence: cadence.is_valid(i).then(|| cadence.value(i)),
1265 power: power.is_valid(i).then(|| power.value(i)),
1266 speed: speed.is_valid(i).then(|| speed.value(i)),
1267 });
1268 }
1269
1270 Ok(points)
1271 }
1272
1273 pub fn upsert_performance_metrics(&self, records: &[PerformanceMetrics]) -> Result<()> {
1279 self.ensure_dir(EntityType::PerformanceMetrics)?;
1280
1281 let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1283 std::collections::HashMap::new();
1284
1285 for record in records {
1286 let key = EntityType::PerformanceMetrics.partition_key(record.date);
1287 partitions.entry(key).or_default().push(record.clone());
1288 }
1289
1290 for (key, mut new_records) in partitions {
1292 let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1293
1294 let mut existing = self.read_performance_metrics_from_path(&path)?;
1296
1297 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1299 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1300
1301 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1303
1304 existing.append(&mut new_records);
1306
1307 existing.sort_by_key(|r| (r.profile_id, r.date));
1309
1310 let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1312 let batch = Self::performance_metrics_to_batch(refs)?;
1313 self.write_batch(&path, &batch)?;
1314 }
1315
1316 Ok(())
1317 }
1318
1319 pub async fn upsert_performance_metrics_async(
1321 &self,
1322 records: &[PerformanceMetrics],
1323 ) -> Result<()> {
1324 self.ensure_dir(EntityType::PerformanceMetrics)?;
1325
1326 let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1328 std::collections::HashMap::new();
1329
1330 for record in records {
1331 let key = EntityType::PerformanceMetrics.partition_key(record.date);
1332 partitions.entry(key).or_default().push(record.clone());
1333 }
1334
1335 for (key, mut new_records) in partitions {
1337 let lock = self.get_partition_lock(&key);
1339 let _guard = lock.lock().await;
1340
1341 let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1342
1343 let mut existing = self.read_performance_metrics_from_path(&path)?;
1345
1346 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1348 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1349
1350 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1352
1353 existing.append(&mut new_records);
1355
1356 existing.sort_by_key(|r| (r.profile_id, r.date));
1358
1359 let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1361 let batch = Self::performance_metrics_to_batch(refs)?;
1362 self.write_batch(&path, &batch)?;
1363 }
1364
1365 Ok(())
1366 }
1367
1368 fn read_performance_metrics_from_path(&self, path: &Path) -> Result<Vec<PerformanceMetrics>> {
1369 let batches = self.read_batches(path)?;
1370 let mut records = Vec::new();
1371
1372 for batch in batches {
1373 records.extend(Self::batch_to_performance_metrics(&batch)?);
1374 }
1375
1376 Ok(records)
1377 }
1378
1379 fn performance_metrics_to_batch(records: Vec<&PerformanceMetrics>) -> Result<RecordBatch> {
1380 let id: Int64Array = records.iter().map(|r| r.id).collect();
1381 let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1382 let date: Date32Array = records
1383 .iter()
1384 .map(|r| {
1385 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1386 Some((r.date - epoch).num_days() as i32)
1387 })
1388 .collect();
1389 let vo2max: Float64Array = records.iter().map(|r| r.vo2max).collect();
1390 let fitness_age: Int32Array = records.iter().map(|r| r.fitness_age).collect();
1391 let training_readiness: Int32Array = records.iter().map(|r| r.training_readiness).collect();
1392 let training_status: StringArray = records
1393 .iter()
1394 .map(|r| r.training_status.as_deref())
1395 .collect();
1396 let lactate_threshold_hr: Int32Array =
1397 records.iter().map(|r| r.lactate_threshold_hr).collect();
1398 let lactate_threshold_pace: Float64Array =
1399 records.iter().map(|r| r.lactate_threshold_pace).collect();
1400 let race_5k_sec: Int32Array = records.iter().map(|r| r.race_5k_sec).collect();
1401 let race_10k_sec: Int32Array = records.iter().map(|r| r.race_10k_sec).collect();
1402 let race_half_sec: Int32Array = records.iter().map(|r| r.race_half_sec).collect();
1403 let race_marathon_sec: Int32Array = records.iter().map(|r| r.race_marathon_sec).collect();
1404 let endurance_score: Int32Array = records.iter().map(|r| r.endurance_score).collect();
1405 let hill_score: Int32Array = records.iter().map(|r| r.hill_score).collect();
1406 let raw_json: StringArray = records
1407 .iter()
1408 .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
1409 .collect();
1410
1411 let schema = Arc::new(Schema::new(vec![
1412 Field::new("id", DataType::Int64, true),
1413 Field::new("profile_id", DataType::Int32, false),
1414 Field::new("date", DataType::Date32, false),
1415 Field::new("vo2max", DataType::Float64, true),
1416 Field::new("fitness_age", DataType::Int32, true),
1417 Field::new("training_readiness", DataType::Int32, true),
1418 Field::new("training_status", DataType::Utf8, true),
1419 Field::new("lactate_threshold_hr", DataType::Int32, true),
1420 Field::new("lactate_threshold_pace", DataType::Float64, true),
1421 Field::new("race_5k_sec", DataType::Int32, true),
1422 Field::new("race_10k_sec", DataType::Int32, true),
1423 Field::new("race_half_sec", DataType::Int32, true),
1424 Field::new("race_marathon_sec", DataType::Int32, true),
1425 Field::new("endurance_score", DataType::Int32, true),
1426 Field::new("hill_score", DataType::Int32, true),
1427 Field::new("raw_json", DataType::Utf8, true),
1428 ]));
1429
1430 RecordBatch::try_new(
1431 schema,
1432 vec![
1433 Arc::new(id),
1434 Arc::new(profile_id),
1435 Arc::new(date),
1436 Arc::new(vo2max),
1437 Arc::new(fitness_age),
1438 Arc::new(training_readiness),
1439 Arc::new(training_status),
1440 Arc::new(lactate_threshold_hr),
1441 Arc::new(lactate_threshold_pace),
1442 Arc::new(race_5k_sec),
1443 Arc::new(race_10k_sec),
1444 Arc::new(race_half_sec),
1445 Arc::new(race_marathon_sec),
1446 Arc::new(endurance_score),
1447 Arc::new(hill_score),
1448 Arc::new(raw_json),
1449 ],
1450 )
1451 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1452 }
1453
1454 fn batch_to_performance_metrics(batch: &RecordBatch) -> Result<Vec<PerformanceMetrics>> {
1455 let len = batch.num_rows();
1456 let mut records = Vec::with_capacity(len);
1457
1458 let id = batch
1459 .column(0)
1460 .as_any()
1461 .downcast_ref::<Int64Array>()
1462 .unwrap();
1463 let profile_id = batch
1464 .column(1)
1465 .as_any()
1466 .downcast_ref::<Int32Array>()
1467 .unwrap();
1468 let date = batch
1469 .column(2)
1470 .as_any()
1471 .downcast_ref::<Date32Array>()
1472 .unwrap();
1473 let vo2max = batch
1474 .column(3)
1475 .as_any()
1476 .downcast_ref::<Float64Array>()
1477 .unwrap();
1478 let fitness_age = batch
1479 .column(4)
1480 .as_any()
1481 .downcast_ref::<Int32Array>()
1482 .unwrap();
1483 let training_readiness = batch
1484 .column(5)
1485 .as_any()
1486 .downcast_ref::<Int32Array>()
1487 .unwrap();
1488 let training_status = batch
1489 .column(6)
1490 .as_any()
1491 .downcast_ref::<StringArray>()
1492 .unwrap();
1493 let lactate_threshold_hr = batch
1494 .column(7)
1495 .as_any()
1496 .downcast_ref::<Int32Array>()
1497 .unwrap();
1498 let lactate_threshold_pace = batch
1499 .column(8)
1500 .as_any()
1501 .downcast_ref::<Float64Array>()
1502 .unwrap();
1503 let race_5k_sec = batch
1504 .column(9)
1505 .as_any()
1506 .downcast_ref::<Int32Array>()
1507 .unwrap();
1508 let race_10k_sec = batch
1509 .column(10)
1510 .as_any()
1511 .downcast_ref::<Int32Array>()
1512 .unwrap();
1513 let race_half_sec = batch
1514 .column(11)
1515 .as_any()
1516 .downcast_ref::<Int32Array>()
1517 .unwrap();
1518 let race_marathon_sec = batch
1519 .column(12)
1520 .as_any()
1521 .downcast_ref::<Int32Array>()
1522 .unwrap();
1523 let endurance_score = batch
1524 .column(13)
1525 .as_any()
1526 .downcast_ref::<Int32Array>()
1527 .unwrap();
1528 let hill_score = batch
1529 .column(14)
1530 .as_any()
1531 .downcast_ref::<Int32Array>()
1532 .unwrap();
1533 let raw_json = batch
1534 .column(15)
1535 .as_any()
1536 .downcast_ref::<StringArray>()
1537 .unwrap();
1538
1539 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1540
1541 for i in 0..len {
1542 records.push(PerformanceMetrics {
1543 id: id.is_valid(i).then(|| id.value(i)),
1544 profile_id: profile_id.value(i),
1545 date: epoch + chrono::Duration::days(date.value(i) as i64),
1546 vo2max: vo2max.is_valid(i).then(|| vo2max.value(i)),
1547 fitness_age: fitness_age.is_valid(i).then(|| fitness_age.value(i)),
1548 training_readiness: training_readiness
1549 .is_valid(i)
1550 .then(|| training_readiness.value(i)),
1551 training_status: training_status
1552 .is_valid(i)
1553 .then(|| training_status.value(i).to_string()),
1554 lactate_threshold_hr: lactate_threshold_hr
1555 .is_valid(i)
1556 .then(|| lactate_threshold_hr.value(i)),
1557 lactate_threshold_pace: lactate_threshold_pace
1558 .is_valid(i)
1559 .then(|| lactate_threshold_pace.value(i)),
1560 race_5k_sec: race_5k_sec.is_valid(i).then(|| race_5k_sec.value(i)),
1561 race_10k_sec: race_10k_sec.is_valid(i).then(|| race_10k_sec.value(i)),
1562 race_half_sec: race_half_sec.is_valid(i).then(|| race_half_sec.value(i)),
1563 race_marathon_sec: race_marathon_sec
1564 .is_valid(i)
1565 .then(|| race_marathon_sec.value(i)),
1566 endurance_score: endurance_score
1567 .is_valid(i)
1568 .then(|| endurance_score.value(i)),
1569 hill_score: hill_score.is_valid(i).then(|| hill_score.value(i)),
1570 raw_json: raw_json
1571 .is_valid(i)
1572 .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1573 });
1574 }
1575
1576 Ok(records)
1577 }
1578
1579 pub fn upsert_weight(&self, records: &[WeightEntry]) -> Result<()> {
1585 self.ensure_dir(EntityType::Weight)?;
1586
1587 let mut partitions: std::collections::HashMap<String, Vec<WeightEntry>> =
1589 std::collections::HashMap::new();
1590
1591 for record in records {
1592 let key = EntityType::Weight.partition_key(record.date);
1593 partitions.entry(key).or_default().push(record.clone());
1594 }
1595
1596 for (key, mut new_records) in partitions {
1598 let path = self.partition_path(EntityType::Weight, &key);
1599
1600 let mut existing = self.read_weight_from_path(&path)?;
1602
1603 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1605 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1606
1607 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1609
1610 existing.append(&mut new_records);
1612
1613 existing.sort_by_key(|r| (r.profile_id, r.date));
1615
1616 let refs: Vec<&WeightEntry> = existing.iter().collect();
1618 let batch = Self::weight_to_batch(refs)?;
1619 self.write_batch(&path, &batch)?;
1620 }
1621
1622 Ok(())
1623 }
1624
1625 fn read_weight_from_path(&self, path: &Path) -> Result<Vec<WeightEntry>> {
1626 let batches = self.read_batches(path)?;
1627 let mut records = Vec::new();
1628
1629 for batch in batches {
1630 records.extend(Self::batch_to_weight(&batch)?);
1631 }
1632
1633 Ok(records)
1634 }
1635
1636 fn weight_to_batch(records: Vec<&WeightEntry>) -> Result<RecordBatch> {
1637 let id: Int64Array = records.iter().map(|r| r.id).collect();
1638 let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1639 let date: Date32Array = records
1640 .iter()
1641 .map(|r| {
1642 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1643 Some((r.date - epoch).num_days() as i32)
1644 })
1645 .collect();
1646 let weight_kg: Float64Array = records.iter().map(|r| r.weight_kg).collect();
1647 let bmi: Float64Array = records.iter().map(|r| r.bmi).collect();
1648 let body_fat_pct: Float64Array = records.iter().map(|r| r.body_fat_pct).collect();
1649 let muscle_mass_kg: Float64Array = records.iter().map(|r| r.muscle_mass_kg).collect();
1650
1651 let schema = Arc::new(Schema::new(vec![
1652 Field::new("id", DataType::Int64, true),
1653 Field::new("profile_id", DataType::Int32, false),
1654 Field::new("date", DataType::Date32, false),
1655 Field::new("weight_kg", DataType::Float64, true),
1656 Field::new("bmi", DataType::Float64, true),
1657 Field::new("body_fat_pct", DataType::Float64, true),
1658 Field::new("muscle_mass_kg", DataType::Float64, true),
1659 ]));
1660
1661 RecordBatch::try_new(
1662 schema,
1663 vec![
1664 Arc::new(id),
1665 Arc::new(profile_id),
1666 Arc::new(date),
1667 Arc::new(weight_kg),
1668 Arc::new(bmi),
1669 Arc::new(body_fat_pct),
1670 Arc::new(muscle_mass_kg),
1671 ],
1672 )
1673 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1674 }
1675
1676 fn batch_to_weight(batch: &RecordBatch) -> Result<Vec<WeightEntry>> {
1677 let len = batch.num_rows();
1678 let mut records = Vec::with_capacity(len);
1679
1680 let id = batch
1681 .column(0)
1682 .as_any()
1683 .downcast_ref::<Int64Array>()
1684 .unwrap();
1685 let profile_id = batch
1686 .column(1)
1687 .as_any()
1688 .downcast_ref::<Int32Array>()
1689 .unwrap();
1690 let date = batch
1691 .column(2)
1692 .as_any()
1693 .downcast_ref::<Date32Array>()
1694 .unwrap();
1695 let weight_kg = batch
1696 .column(3)
1697 .as_any()
1698 .downcast_ref::<Float64Array>()
1699 .unwrap();
1700 let bmi = batch
1701 .column(4)
1702 .as_any()
1703 .downcast_ref::<Float64Array>()
1704 .unwrap();
1705 let body_fat_pct = batch
1706 .column(5)
1707 .as_any()
1708 .downcast_ref::<Float64Array>()
1709 .unwrap();
1710 let muscle_mass_kg = batch
1711 .column(6)
1712 .as_any()
1713 .downcast_ref::<Float64Array>()
1714 .unwrap();
1715
1716 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1717
1718 for i in 0..len {
1719 records.push(WeightEntry {
1720 id: id.is_valid(i).then(|| id.value(i)),
1721 profile_id: profile_id.value(i),
1722 date: epoch + chrono::Duration::days(date.value(i) as i64),
1723 weight_kg: weight_kg.is_valid(i).then(|| weight_kg.value(i)),
1724 bmi: bmi.is_valid(i).then(|| bmi.value(i)),
1725 body_fat_pct: body_fat_pct.is_valid(i).then(|| body_fat_pct.value(i)),
1726 muscle_mass_kg: muscle_mass_kg.is_valid(i).then(|| muscle_mass_kg.value(i)),
1727 });
1728 }
1729
1730 Ok(records)
1731 }
1732
1733 pub fn write_profiles(&self, profiles: &[Profile]) -> Result<()> {
1739 fs::create_dir_all(&self.base_path)
1740 .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
1741
1742 let path = self.partition_path(EntityType::Profiles, "");
1743 let refs: Vec<&Profile> = profiles.iter().collect();
1744 let batch = Self::profiles_to_batch(refs)?;
1745 self.write_batch(&path, &batch)?;
1746
1747 Ok(())
1748 }
1749
1750 pub fn read_profiles(&self) -> Result<Vec<Profile>> {
1752 let path = self.partition_path(EntityType::Profiles, "");
1753 let batches = self.read_batches(&path)?;
1754 let mut profiles = Vec::new();
1755
1756 for batch in batches {
1757 profiles.extend(Self::batch_to_profiles(&batch)?);
1758 }
1759
1760 Ok(profiles)
1761 }
1762
1763 fn profiles_to_batch(profiles: Vec<&Profile>) -> Result<RecordBatch> {
1764 let profile_id: Int32Array = profiles.iter().map(|p| p.profile_id).collect();
1765 let display_name: StringArray = profiles
1766 .iter()
1767 .map(|p| Some(p.display_name.as_str()))
1768 .collect();
1769 let user_id: Int64Array = profiles.iter().map(|p| p.user_id).collect();
1770 let created_at: TimestampMicrosecondArray = profiles
1771 .iter()
1772 .map(|p| p.created_at.map(|t| t.timestamp_micros()))
1773 .collect();
1774 let last_sync_at: TimestampMicrosecondArray = profiles
1775 .iter()
1776 .map(|p| p.last_sync_at.map(|t| t.timestamp_micros()))
1777 .collect();
1778
1779 let schema = Arc::new(Schema::new(vec![
1780 Field::new("profile_id", DataType::Int32, false),
1781 Field::new("display_name", DataType::Utf8, false),
1782 Field::new("user_id", DataType::Int64, true),
1783 Field::new(
1784 "created_at",
1785 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1786 true,
1787 ),
1788 Field::new(
1789 "last_sync_at",
1790 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1791 true,
1792 ),
1793 ]));
1794
1795 RecordBatch::try_new(
1796 schema,
1797 vec![
1798 Arc::new(profile_id),
1799 Arc::new(display_name),
1800 Arc::new(user_id),
1801 Arc::new(created_at),
1802 Arc::new(last_sync_at),
1803 ],
1804 )
1805 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1806 }
1807
1808 fn batch_to_profiles(batch: &RecordBatch) -> Result<Vec<Profile>> {
1809 let len = batch.num_rows();
1810 let mut profiles = Vec::with_capacity(len);
1811
1812 let profile_id = batch
1813 .column(0)
1814 .as_any()
1815 .downcast_ref::<Int32Array>()
1816 .unwrap();
1817 let display_name = batch
1818 .column(1)
1819 .as_any()
1820 .downcast_ref::<StringArray>()
1821 .unwrap();
1822 let user_id = batch
1823 .column(2)
1824 .as_any()
1825 .downcast_ref::<Int64Array>()
1826 .unwrap();
1827 let created_at = batch
1828 .column(3)
1829 .as_any()
1830 .downcast_ref::<TimestampMicrosecondArray>()
1831 .unwrap();
1832 let last_sync_at = batch
1833 .column(4)
1834 .as_any()
1835 .downcast_ref::<TimestampMicrosecondArray>()
1836 .unwrap();
1837
1838 for i in 0..len {
1839 profiles.push(Profile {
1840 profile_id: profile_id.value(i),
1841 display_name: display_name.value(i).to_string(),
1842 user_id: user_id.is_valid(i).then(|| user_id.value(i)),
1843 created_at: created_at.is_valid(i).then(|| {
1844 DateTime::from_timestamp_micros(created_at.value(i)).unwrap_or_default()
1845 }),
1846 last_sync_at: last_sync_at.is_valid(i).then(|| {
1847 DateTime::from_timestamp_micros(last_sync_at.value(i)).unwrap_or_default()
1848 }),
1849 });
1850 }
1851
1852 Ok(profiles)
1853 }
1854
1855 pub fn base_path(&self) -> &Path {
1857 &self.base_path
1858 }
1859}
1860
1861#[cfg(test)]
1862mod tests {
1863 use super::*;
1864 use tempfile::TempDir;
1865
1866 #[test]
1867 fn test_activity_round_trip() {
1868 let temp = TempDir::new().unwrap();
1869 let store = ParquetStore::new(temp.path());
1870
1871 let activities = vec![Activity {
1872 activity_id: 123,
1873 profile_id: 1,
1874 activity_name: Some("Morning Run".to_string()),
1875 activity_type: Some("running".to_string()),
1876 start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1877 start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1878 duration_sec: Some(3600.0),
1879 distance_m: Some(10000.0),
1880 calories: Some(500),
1881 avg_hr: Some(150),
1882 max_hr: Some(175),
1883 avg_speed: Some(2.78),
1884 max_speed: Some(3.5),
1885 elevation_gain: Some(100.0),
1886 elevation_loss: Some(100.0),
1887 avg_cadence: Some(180.0),
1888 avg_power: None,
1889 normalized_power: None,
1890 training_effect: Some(3.5),
1891 training_load: Some(120.0),
1892 start_lat: Some(37.7749),
1893 start_lon: Some(-122.4194),
1894 end_lat: Some(37.7849),
1895 end_lon: Some(-122.4094),
1896 ground_contact_time: Some(250.0),
1897 vertical_oscillation: Some(8.5),
1898 stride_length: Some(1.2),
1899 location_name: Some("San Francisco".to_string()),
1900 raw_json: None,
1901 }];
1902
1903 store.upsert_activities(&activities).unwrap();
1904
1905 let key = EntityType::Activities
1907 .partition_key(activities[0].start_time_local.unwrap().date_naive());
1908 let path = store.partition_path(EntityType::Activities, &key);
1909 let read_back = store.read_activities_from_path(&path).unwrap();
1910
1911 assert_eq!(read_back.len(), 1);
1912 assert_eq!(read_back[0].activity_id, 123);
1913 assert_eq!(read_back[0].activity_name, Some("Morning Run".to_string()));
1914 }
1915
1916 #[test]
1917 fn test_daily_health_round_trip() {
1918 let temp = TempDir::new().unwrap();
1919 let store = ParquetStore::new(temp.path());
1920
1921 let records = vec![DailyHealth {
1922 id: None,
1923 profile_id: 1,
1924 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
1925 steps: Some(10000),
1926 step_goal: Some(8000),
1927 total_calories: Some(2500),
1928 active_calories: Some(500),
1929 bmr_calories: Some(2000),
1930 resting_hr: Some(55),
1931 sleep_seconds: Some(28800),
1932 deep_sleep_seconds: Some(7200),
1933 light_sleep_seconds: Some(14400),
1934 rem_sleep_seconds: Some(7200),
1935 sleep_score: Some(85),
1936 avg_stress: Some(30),
1937 max_stress: Some(75),
1938 body_battery_start: Some(95),
1939 body_battery_end: Some(45),
1940 hrv_weekly_avg: Some(45),
1941 hrv_last_night: Some(48),
1942 hrv_status: Some("balanced".to_string()),
1943 avg_respiration: Some(14.5),
1944 avg_spo2: Some(96),
1945 lowest_spo2: Some(92),
1946 hydration_ml: Some(2500),
1947 moderate_intensity_min: Some(30),
1948 vigorous_intensity_min: Some(20),
1949 raw_json: None,
1950 }];
1951
1952 store.upsert_daily_health(&records).unwrap();
1953
1954 let key = EntityType::DailyHealth.partition_key(records[0].date);
1956 let path = store.partition_path(EntityType::DailyHealth, &key);
1957 let read_back = store.read_daily_health_from_path(&path).unwrap();
1958
1959 assert_eq!(read_back.len(), 1);
1960 assert_eq!(read_back[0].steps, Some(10000));
1961 assert_eq!(
1962 read_back[0].date,
1963 NaiveDate::from_ymd_opt(2024, 12, 15).unwrap()
1964 );
1965 }
1966
1967 #[test]
1968 fn test_has_daily_health() {
1969 let temp = TempDir::new().unwrap();
1970 let store = ParquetStore::new(temp.path());
1971 let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
1972
1973 assert!(!store.has_daily_health(1, date).unwrap());
1974
1975 let records = vec![DailyHealth {
1976 id: None,
1977 profile_id: 1,
1978 date,
1979 steps: Some(10000),
1980 step_goal: None,
1981 total_calories: None,
1982 active_calories: None,
1983 bmr_calories: None,
1984 resting_hr: None,
1985 sleep_seconds: None,
1986 deep_sleep_seconds: None,
1987 light_sleep_seconds: None,
1988 rem_sleep_seconds: None,
1989 sleep_score: None,
1990 avg_stress: None,
1991 max_stress: None,
1992 body_battery_start: None,
1993 body_battery_end: None,
1994 hrv_weekly_avg: None,
1995 hrv_last_night: None,
1996 hrv_status: None,
1997 avg_respiration: None,
1998 avg_spo2: None,
1999 lowest_spo2: None,
2000 hydration_ml: None,
2001 moderate_intensity_min: None,
2002 vigorous_intensity_min: None,
2003 raw_json: None,
2004 }];
2005
2006 store.upsert_daily_health(&records).unwrap();
2007 assert!(store.has_daily_health(1, date).unwrap());
2008 }
2009
2010 #[test]
2011 fn test_has_track_points() {
2012 let temp = TempDir::new().unwrap();
2013 let store = ParquetStore::new(temp.path());
2014 let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
2015
2016 assert!(!store.has_track_points(42, date).unwrap());
2017
2018 let points = vec![TrackPoint {
2019 id: None,
2020 activity_id: 42,
2021 timestamp: DateTime::from_timestamp(1703001600, 0).unwrap(),
2022 lat: Some(1.0),
2023 lon: Some(2.0),
2024 elevation: Some(3.0),
2025 heart_rate: None,
2026 cadence: None,
2027 power: None,
2028 speed: None,
2029 }];
2030
2031 store.write_track_points(date, &points).unwrap();
2032 assert!(store.has_track_points(42, date).unwrap());
2033 }
2034
2035 #[test]
2036 fn test_concurrent_read_write() {
2037 use std::sync::{Arc, Barrier};
2038 use std::thread;
2039
2040 let temp = TempDir::new().unwrap();
2041 let store = Arc::new(ParquetStore::new(temp.path()));
2042
2043 let initial_activities = vec![Activity {
2045 activity_id: 1,
2046 profile_id: 1,
2047 activity_name: Some("Initial Activity".to_string()),
2048 activity_type: Some("running".to_string()),
2049 start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2050 start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2051 duration_sec: Some(3600.0),
2052 distance_m: Some(10000.0),
2053 calories: None,
2054 avg_hr: None,
2055 max_hr: None,
2056 avg_speed: None,
2057 max_speed: None,
2058 elevation_gain: None,
2059 elevation_loss: None,
2060 avg_cadence: None,
2061 avg_power: None,
2062 normalized_power: None,
2063 training_effect: None,
2064 training_load: None,
2065 start_lat: None,
2066 start_lon: None,
2067 end_lat: None,
2068 end_lon: None,
2069 ground_contact_time: None,
2070 vertical_oscillation: None,
2071 stride_length: None,
2072 location_name: None,
2073 raw_json: None,
2074 }];
2075 store.upsert_activities(&initial_activities).unwrap();
2076
2077 let barrier = Arc::new(Barrier::new(2));
2079 let store_reader = Arc::clone(&store);
2080 let barrier_reader = Arc::clone(&barrier);
2081
2082 let reader_handle = thread::spawn(move || {
2084 barrier_reader.wait(); let key = EntityType::Activities
2088 .partition_key(NaiveDate::from_ymd_opt(2023, 12, 19).unwrap());
2089 let path = store_reader.partition_path(EntityType::Activities, &key);
2090 let activities = store_reader.read_activities_from_path(&path).unwrap();
2091
2092 assert!(!activities.is_empty(), "Reader should see initial data");
2094 activities.len()
2095 });
2096
2097 let writer_handle = thread::spawn(move || {
2099 barrier.wait(); let new_activities = vec![Activity {
2103 activity_id: 2,
2104 profile_id: 1,
2105 activity_name: Some("Concurrent Activity".to_string()),
2106 activity_type: Some("cycling".to_string()),
2107 start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2109 duration_sec: Some(7200.0),
2110 distance_m: Some(50000.0),
2111 calories: None,
2112 avg_hr: None,
2113 max_hr: None,
2114 avg_speed: None,
2115 max_speed: None,
2116 elevation_gain: None,
2117 elevation_loss: None,
2118 avg_cadence: None,
2119 avg_power: None,
2120 normalized_power: None,
2121 training_effect: None,
2122 training_load: None,
2123 start_lat: None,
2124 start_lon: None,
2125 end_lat: None,
2126 end_lon: None,
2127 ground_contact_time: None,
2128 vertical_oscillation: None,
2129 stride_length: None,
2130 location_name: None,
2131 raw_json: None,
2132 }];
2133 store.upsert_activities(&new_activities).unwrap();
2134 2 });
2136
2137 let read_count = reader_handle.join().expect("Reader thread panicked");
2139 let written_id = writer_handle.join().expect("Writer thread panicked");
2140
2141 assert_eq!(read_count, 1, "Reader should have read 1 activity");
2142 assert_eq!(written_id, 2, "Writer should have written activity 2");
2143 }
2144
2145 #[test]
2146 fn test_duckdb_glob_query() {
2147 use duckdb::Connection;
2148
2149 let temp = TempDir::new().unwrap();
2150 let store = ParquetStore::new(temp.path());
2151
2152 let activities = vec![
2154 Activity {
2155 activity_id: 1,
2156 profile_id: 1,
2157 activity_name: Some("Week 51 Run".to_string()),
2158 activity_type: Some("running".to_string()),
2159 start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()), start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2161 duration_sec: Some(3600.0),
2162 distance_m: Some(10000.0),
2163 calories: None,
2164 avg_hr: None,
2165 max_hr: None,
2166 avg_speed: None,
2167 max_speed: None,
2168 elevation_gain: None,
2169 elevation_loss: None,
2170 avg_cadence: None,
2171 avg_power: None,
2172 normalized_power: None,
2173 training_effect: None,
2174 training_load: None,
2175 start_lat: None,
2176 start_lon: None,
2177 end_lat: None,
2178 end_lon: None,
2179 ground_contact_time: None,
2180 vertical_oscillation: None,
2181 stride_length: None,
2182 location_name: None,
2183 raw_json: None,
2184 },
2185 Activity {
2186 activity_id: 2,
2187 profile_id: 1,
2188 activity_name: Some("Week 52 Ride".to_string()),
2189 activity_type: Some("cycling".to_string()),
2190 start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2192 duration_sec: Some(7200.0),
2193 distance_m: Some(50000.0),
2194 calories: None,
2195 avg_hr: None,
2196 max_hr: None,
2197 avg_speed: None,
2198 max_speed: None,
2199 elevation_gain: None,
2200 elevation_loss: None,
2201 avg_cadence: None,
2202 avg_power: None,
2203 normalized_power: None,
2204 training_effect: None,
2205 training_load: None,
2206 start_lat: None,
2207 start_lon: None,
2208 end_lat: None,
2209 end_lon: None,
2210 ground_contact_time: None,
2211 vertical_oscillation: None,
2212 stride_length: None,
2213 location_name: None,
2214 raw_json: None,
2215 },
2216 ];
2217 store.upsert_activities(&activities).unwrap();
2218
2219 let conn = Connection::open_in_memory().unwrap();
2221 let glob_pattern = format!("{}/*.parquet", temp.path().join("activities").display());
2222
2223 let mut stmt = conn
2224 .prepare(&format!(
2225 "SELECT activity_id, activity_name FROM '{}' ORDER BY activity_id",
2226 glob_pattern
2227 ))
2228 .unwrap();
2229
2230 let results: Vec<(i64, String)> = stmt
2231 .query_map([], |row| {
2232 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
2233 })
2234 .unwrap()
2235 .filter_map(|r| r.ok())
2236 .collect();
2237
2238 assert_eq!(results.len(), 2);
2240 assert_eq!(results[0], (1, "Week 51 Run".to_string()));
2241 assert_eq!(results[1], (2, "Week 52 Ride".to_string()));
2242 }
2243}