1use std::fs::{self, File};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::*;
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow::record_batch::RecordBatch;
13use chrono::{DateTime, NaiveDate};
14use dashmap::DashMap;
15use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
16use parquet::arrow::ArrowWriter;
17use parquet::basic::Compression;
18use parquet::file::properties::WriterProperties;
19use tokio::sync::Mutex as TokioMutex;
20
21use crate::db::models::{
22 Activity, DailyHealth, PerformanceMetrics, Profile, TrackPoint, WeightEntry,
23};
24use crate::error::{GarminError, Result};
25
26use super::partitions::EntityType;
27
28#[derive(Clone)]
33pub struct ParquetStore {
34 base_path: PathBuf,
35 partition_locks: Arc<DashMap<String, Arc<TokioMutex<()>>>>,
37}
38
39impl ParquetStore {
40 pub fn new(base_path: impl Into<PathBuf>) -> Self {
42 Self {
43 base_path: base_path.into(),
44 partition_locks: Arc::new(DashMap::new()),
45 }
46 }
47
48 fn get_partition_lock(&self, partition_key: &str) -> Arc<TokioMutex<()>> {
50 self.partition_locks
51 .entry(partition_key.to_string())
52 .or_insert_with(|| Arc::new(TokioMutex::new(())))
53 .clone()
54 }
55
56 pub fn has_daily_health(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
58 let key = EntityType::DailyHealth.partition_key(date);
59 let path = self.partition_path(EntityType::DailyHealth, &key);
60 if !path.exists() {
61 return Ok(false);
62 }
63 let records = self.read_daily_health_from_path(&path)?;
64 Ok(records
65 .iter()
66 .any(|r| r.profile_id == profile_id && r.date == date))
67 }
68
69 pub fn has_performance_metrics(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
71 let key = EntityType::PerformanceMetrics.partition_key(date);
72 let path = self.partition_path(EntityType::PerformanceMetrics, &key);
73 if !path.exists() {
74 return Ok(false);
75 }
76 let records = self.read_performance_metrics_from_path(&path)?;
77 Ok(records
78 .iter()
79 .any(|r| r.profile_id == profile_id && r.date == date))
80 }
81
82 pub fn has_track_points(&self, activity_id: i64, date: NaiveDate) -> Result<bool> {
84 let key = EntityType::TrackPoints.partition_key(date);
85 let path = self.partition_path(EntityType::TrackPoints, &key);
86 if !path.exists() {
87 return Ok(false);
88 }
89 let records = self.read_track_points_from_path(&path)?;
90 Ok(records.iter().any(|r| r.activity_id == activity_id))
91 }
92
93 pub fn partition_path(&self, entity: EntityType, partition_key: &str) -> PathBuf {
95 match entity {
96 EntityType::Profiles => self.base_path.join("profiles.parquet"),
97 _ => self
98 .base_path
99 .join(entity.dir_name())
100 .join(format!("{}.parquet", partition_key)),
101 }
102 }
103
104 fn ensure_dir(&self, entity: EntityType) -> Result<()> {
106 if entity != EntityType::Profiles {
107 let dir = self.base_path.join(entity.dir_name());
108 fs::create_dir_all(&dir).map_err(|e| {
109 GarminError::Database(format!("Failed to create directory {:?}: {}", dir, e))
110 })?;
111 } else {
112 fs::create_dir_all(&self.base_path).map_err(|e| {
113 GarminError::Database(format!(
114 "Failed to create directory {:?}: {}",
115 self.base_path, e
116 ))
117 })?;
118 }
119 Ok(())
120 }
121
122 fn write_batch(&self, path: &Path, batch: &RecordBatch) -> Result<()> {
124 let temp_path = path.with_extension("parquet.tmp");
126
127 if let Some(parent) = path.parent() {
129 fs::create_dir_all(parent)
130 .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
131 }
132
133 let file = File::create(&temp_path)
134 .map_err(|e| GarminError::Database(format!("Failed to create temp file: {}", e)))?;
135
136 let props = WriterProperties::builder()
137 .set_compression(Compression::ZSTD(Default::default()))
138 .build();
139
140 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
141 GarminError::Database(format!("Failed to create Parquet writer: {}", e))
142 })?;
143
144 writer
145 .write(batch)
146 .map_err(|e| GarminError::Database(format!("Failed to write batch: {}", e)))?;
147
148 writer
149 .close()
150 .map_err(|e| GarminError::Database(format!("Failed to close writer: {}", e)))?;
151
152 fs::rename(&temp_path, path)
154 .map_err(|e| GarminError::Database(format!("Failed to rename temp file: {}", e)))?;
155
156 Ok(())
157 }
158
159 fn read_batches(&self, path: &Path) -> Result<Vec<RecordBatch>> {
161 if !path.exists() {
162 return Ok(Vec::new());
163 }
164
165 let file = File::open(path)
166 .map_err(|e| GarminError::Database(format!("Failed to open file: {}", e)))?;
167
168 let reader = ParquetRecordBatchReaderBuilder::try_new(file)
169 .map_err(|e| GarminError::Database(format!("Failed to create reader: {}", e)))?
170 .build()
171 .map_err(|e| GarminError::Database(format!("Failed to build reader: {}", e)))?;
172
173 reader
174 .collect::<std::result::Result<Vec<_>, _>>()
175 .map_err(|e| GarminError::Database(format!("Failed to read batches: {}", e)))
176 }
177
178 pub fn write_activities(&self, activities: &[Activity]) -> Result<()> {
184 self.ensure_dir(EntityType::Activities)?;
185
186 let mut partitions: std::collections::HashMap<String, Vec<&Activity>> =
188 std::collections::HashMap::new();
189
190 for activity in activities {
191 if let Some(start_time) = activity.start_time_local {
192 let key = EntityType::Activities.partition_key(start_time.date_naive());
193 partitions.entry(key).or_default().push(activity);
194 }
195 }
196
197 for (key, partition_activities) in partitions {
199 let path = self.partition_path(EntityType::Activities, &key);
200 let batch = Self::activities_to_batch(partition_activities)?;
201 self.write_batch(&path, &batch)?;
202 }
203
204 Ok(())
205 }
206
207 pub fn upsert_activities(&self, activities: &[Activity]) -> Result<()> {
209 self.ensure_dir(EntityType::Activities)?;
210
211 let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
213 std::collections::HashMap::new();
214
215 for activity in activities {
216 if let Some(start_time) = activity.start_time_local {
217 let key = EntityType::Activities.partition_key(start_time.date_naive());
218 partitions.entry(key).or_default().push(activity.clone());
219 }
220 }
221
222 for (key, mut new_activities) in partitions {
224 let path = self.partition_path(EntityType::Activities, &key);
225
226 let mut existing = self.read_activities_from_path(&path)?;
228
229 let new_ids: std::collections::HashSet<i64> =
231 new_activities.iter().map(|a| a.activity_id).collect();
232
233 existing.retain(|a| !new_ids.contains(&a.activity_id));
235
236 existing.append(&mut new_activities);
238
239 existing.sort_by_key(|a| a.activity_id);
241
242 let refs: Vec<&Activity> = existing.iter().collect();
244 let batch = Self::activities_to_batch(refs)?;
245 self.write_batch(&path, &batch)?;
246 }
247
248 Ok(())
249 }
250
251 pub async fn upsert_activities_async(&self, activities: &[Activity]) -> Result<()> {
253 self.ensure_dir(EntityType::Activities)?;
254
255 let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
257 std::collections::HashMap::new();
258
259 for activity in activities {
260 if let Some(start_time) = activity.start_time_local {
261 let key = EntityType::Activities.partition_key(start_time.date_naive());
262 partitions.entry(key).or_default().push(activity.clone());
263 }
264 }
265
266 for (key, mut new_activities) in partitions {
268 let lock = self.get_partition_lock(&key);
270 let _guard = lock.lock().await;
271
272 let path = self.partition_path(EntityType::Activities, &key);
273
274 let mut existing = self.read_activities_from_path(&path)?;
276
277 let new_ids: std::collections::HashSet<i64> =
279 new_activities.iter().map(|a| a.activity_id).collect();
280
281 existing.retain(|a| !new_ids.contains(&a.activity_id));
283
284 existing.append(&mut new_activities);
286
287 existing.sort_by_key(|a| a.activity_id);
289
290 let refs: Vec<&Activity> = existing.iter().collect();
292 let batch = Self::activities_to_batch(refs)?;
293 self.write_batch(&path, &batch)?;
294 }
295
296 Ok(())
297 }
298
299 fn read_activities_from_path(&self, path: &Path) -> Result<Vec<Activity>> {
300 let batches = self.read_batches(path)?;
301 let mut activities = Vec::new();
302
303 for batch in batches {
304 activities.extend(Self::batch_to_activities(&batch)?);
305 }
306
307 Ok(activities)
308 }
309
310 fn activities_to_batch(activities: Vec<&Activity>) -> Result<RecordBatch> {
311 let activity_id: Int64Array = activities.iter().map(|a| a.activity_id).collect();
312 let profile_id: Int32Array = activities.iter().map(|a| a.profile_id).collect();
313 let activity_name: StringArray = activities
314 .iter()
315 .map(|a| a.activity_name.as_deref())
316 .collect();
317 let activity_type: StringArray = activities
318 .iter()
319 .map(|a| a.activity_type.as_deref())
320 .collect();
321 let start_time_local: TimestampMicrosecondArray = activities
322 .iter()
323 .map(|a| a.start_time_local.map(|t| t.timestamp_micros()))
324 .collect();
325 let start_time_gmt: TimestampMicrosecondArray = activities
326 .iter()
327 .map(|a| a.start_time_gmt.map(|t| t.timestamp_micros()))
328 .collect();
329 let duration_sec: Float64Array = activities.iter().map(|a| a.duration_sec).collect();
330 let distance_m: Float64Array = activities.iter().map(|a| a.distance_m).collect();
331 let calories: Int32Array = activities.iter().map(|a| a.calories).collect();
332 let avg_hr: Int32Array = activities.iter().map(|a| a.avg_hr).collect();
333 let max_hr: Int32Array = activities.iter().map(|a| a.max_hr).collect();
334 let avg_speed: Float64Array = activities.iter().map(|a| a.avg_speed).collect();
335 let max_speed: Float64Array = activities.iter().map(|a| a.max_speed).collect();
336 let elevation_gain: Float64Array = activities.iter().map(|a| a.elevation_gain).collect();
337 let elevation_loss: Float64Array = activities.iter().map(|a| a.elevation_loss).collect();
338 let avg_cadence: Float64Array = activities.iter().map(|a| a.avg_cadence).collect();
339 let avg_power: Int32Array = activities.iter().map(|a| a.avg_power).collect();
340 let normalized_power: Int32Array = activities.iter().map(|a| a.normalized_power).collect();
341 let training_effect: Float64Array = activities.iter().map(|a| a.training_effect).collect();
342 let training_load: Float64Array = activities.iter().map(|a| a.training_load).collect();
343 let start_lat: Float64Array = activities.iter().map(|a| a.start_lat).collect();
344 let start_lon: Float64Array = activities.iter().map(|a| a.start_lon).collect();
345 let end_lat: Float64Array = activities.iter().map(|a| a.end_lat).collect();
346 let end_lon: Float64Array = activities.iter().map(|a| a.end_lon).collect();
347 let ground_contact_time: Float64Array =
348 activities.iter().map(|a| a.ground_contact_time).collect();
349 let vertical_oscillation: Float64Array =
350 activities.iter().map(|a| a.vertical_oscillation).collect();
351 let stride_length: Float64Array = activities.iter().map(|a| a.stride_length).collect();
352 let location_name: StringArray = activities
353 .iter()
354 .map(|a| a.location_name.as_deref())
355 .collect();
356 let raw_json: StringArray = activities
357 .iter()
358 .map(|a| a.raw_json.as_ref().map(|j| j.to_string()))
359 .collect();
360
361 let schema = Arc::new(Schema::new(vec![
362 Field::new("activity_id", DataType::Int64, false),
363 Field::new("profile_id", DataType::Int32, false),
364 Field::new("activity_name", DataType::Utf8, true),
365 Field::new("activity_type", DataType::Utf8, true),
366 Field::new(
367 "start_time_local",
368 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
369 true,
370 ),
371 Field::new(
372 "start_time_gmt",
373 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
374 true,
375 ),
376 Field::new("duration_sec", DataType::Float64, true),
377 Field::new("distance_m", DataType::Float64, true),
378 Field::new("calories", DataType::Int32, true),
379 Field::new("avg_hr", DataType::Int32, true),
380 Field::new("max_hr", DataType::Int32, true),
381 Field::new("avg_speed", DataType::Float64, true),
382 Field::new("max_speed", DataType::Float64, true),
383 Field::new("elevation_gain", DataType::Float64, true),
384 Field::new("elevation_loss", DataType::Float64, true),
385 Field::new("avg_cadence", DataType::Float64, true),
386 Field::new("avg_power", DataType::Int32, true),
387 Field::new("normalized_power", DataType::Int32, true),
388 Field::new("training_effect", DataType::Float64, true),
389 Field::new("training_load", DataType::Float64, true),
390 Field::new("start_lat", DataType::Float64, true),
391 Field::new("start_lon", DataType::Float64, true),
392 Field::new("end_lat", DataType::Float64, true),
393 Field::new("end_lon", DataType::Float64, true),
394 Field::new("ground_contact_time", DataType::Float64, true),
395 Field::new("vertical_oscillation", DataType::Float64, true),
396 Field::new("stride_length", DataType::Float64, true),
397 Field::new("location_name", DataType::Utf8, true),
398 Field::new("raw_json", DataType::Utf8, true),
399 ]));
400
401 RecordBatch::try_new(
402 schema,
403 vec![
404 Arc::new(activity_id),
405 Arc::new(profile_id),
406 Arc::new(activity_name),
407 Arc::new(activity_type),
408 Arc::new(start_time_local),
409 Arc::new(start_time_gmt),
410 Arc::new(duration_sec),
411 Arc::new(distance_m),
412 Arc::new(calories),
413 Arc::new(avg_hr),
414 Arc::new(max_hr),
415 Arc::new(avg_speed),
416 Arc::new(max_speed),
417 Arc::new(elevation_gain),
418 Arc::new(elevation_loss),
419 Arc::new(avg_cadence),
420 Arc::new(avg_power),
421 Arc::new(normalized_power),
422 Arc::new(training_effect),
423 Arc::new(training_load),
424 Arc::new(start_lat),
425 Arc::new(start_lon),
426 Arc::new(end_lat),
427 Arc::new(end_lon),
428 Arc::new(ground_contact_time),
429 Arc::new(vertical_oscillation),
430 Arc::new(stride_length),
431 Arc::new(location_name),
432 Arc::new(raw_json),
433 ],
434 )
435 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
436 }
437
438 fn batch_to_activities(batch: &RecordBatch) -> Result<Vec<Activity>> {
439 let len = batch.num_rows();
440 let mut activities = Vec::with_capacity(len);
441
442 let activity_id = batch
443 .column(0)
444 .as_any()
445 .downcast_ref::<Int64Array>()
446 .unwrap();
447 let profile_id = batch
448 .column(1)
449 .as_any()
450 .downcast_ref::<Int32Array>()
451 .unwrap();
452 let activity_name = batch
453 .column(2)
454 .as_any()
455 .downcast_ref::<StringArray>()
456 .unwrap();
457 let activity_type = batch
458 .column(3)
459 .as_any()
460 .downcast_ref::<StringArray>()
461 .unwrap();
462 let start_time_local = batch
463 .column(4)
464 .as_any()
465 .downcast_ref::<TimestampMicrosecondArray>()
466 .unwrap();
467 let start_time_gmt = batch
468 .column(5)
469 .as_any()
470 .downcast_ref::<TimestampMicrosecondArray>()
471 .unwrap();
472 let duration_sec = batch
473 .column(6)
474 .as_any()
475 .downcast_ref::<Float64Array>()
476 .unwrap();
477 let distance_m = batch
478 .column(7)
479 .as_any()
480 .downcast_ref::<Float64Array>()
481 .unwrap();
482 let calories = batch
483 .column(8)
484 .as_any()
485 .downcast_ref::<Int32Array>()
486 .unwrap();
487 let avg_hr = batch
488 .column(9)
489 .as_any()
490 .downcast_ref::<Int32Array>()
491 .unwrap();
492 let max_hr = batch
493 .column(10)
494 .as_any()
495 .downcast_ref::<Int32Array>()
496 .unwrap();
497 let avg_speed = batch
498 .column(11)
499 .as_any()
500 .downcast_ref::<Float64Array>()
501 .unwrap();
502 let max_speed = batch
503 .column(12)
504 .as_any()
505 .downcast_ref::<Float64Array>()
506 .unwrap();
507 let elevation_gain = batch
508 .column(13)
509 .as_any()
510 .downcast_ref::<Float64Array>()
511 .unwrap();
512 let elevation_loss = batch
513 .column(14)
514 .as_any()
515 .downcast_ref::<Float64Array>()
516 .unwrap();
517 let avg_cadence = batch
518 .column(15)
519 .as_any()
520 .downcast_ref::<Float64Array>()
521 .unwrap();
522 let avg_power = batch
523 .column(16)
524 .as_any()
525 .downcast_ref::<Int32Array>()
526 .unwrap();
527 let normalized_power = batch
528 .column(17)
529 .as_any()
530 .downcast_ref::<Int32Array>()
531 .unwrap();
532 let training_effect = batch
533 .column(18)
534 .as_any()
535 .downcast_ref::<Float64Array>()
536 .unwrap();
537 let training_load = batch
538 .column(19)
539 .as_any()
540 .downcast_ref::<Float64Array>()
541 .unwrap();
542 let start_lat = batch
543 .column(20)
544 .as_any()
545 .downcast_ref::<Float64Array>()
546 .unwrap();
547 let start_lon = batch
548 .column(21)
549 .as_any()
550 .downcast_ref::<Float64Array>()
551 .unwrap();
552 let end_lat = batch
553 .column(22)
554 .as_any()
555 .downcast_ref::<Float64Array>()
556 .unwrap();
557 let end_lon = batch
558 .column(23)
559 .as_any()
560 .downcast_ref::<Float64Array>()
561 .unwrap();
562 let ground_contact_time = batch
563 .column(24)
564 .as_any()
565 .downcast_ref::<Float64Array>()
566 .unwrap();
567 let vertical_oscillation = batch
568 .column(25)
569 .as_any()
570 .downcast_ref::<Float64Array>()
571 .unwrap();
572 let stride_length = batch
573 .column(26)
574 .as_any()
575 .downcast_ref::<Float64Array>()
576 .unwrap();
577 let location_name = batch
578 .column(27)
579 .as_any()
580 .downcast_ref::<StringArray>()
581 .unwrap();
582 let raw_json = batch
583 .column(28)
584 .as_any()
585 .downcast_ref::<StringArray>()
586 .unwrap();
587
588 for i in 0..len {
589 activities.push(Activity {
590 activity_id: activity_id.value(i),
591 profile_id: profile_id.value(i),
592 activity_name: activity_name
593 .is_valid(i)
594 .then(|| activity_name.value(i).to_string()),
595 activity_type: activity_type
596 .is_valid(i)
597 .then(|| activity_type.value(i).to_string()),
598 start_time_local: start_time_local.is_valid(i).then(|| {
599 DateTime::from_timestamp_micros(start_time_local.value(i)).unwrap_or_default()
600 }),
601 start_time_gmt: start_time_gmt.is_valid(i).then(|| {
602 DateTime::from_timestamp_micros(start_time_gmt.value(i)).unwrap_or_default()
603 }),
604 duration_sec: duration_sec.is_valid(i).then(|| duration_sec.value(i)),
605 distance_m: distance_m.is_valid(i).then(|| distance_m.value(i)),
606 calories: calories.is_valid(i).then(|| calories.value(i)),
607 avg_hr: avg_hr.is_valid(i).then(|| avg_hr.value(i)),
608 max_hr: max_hr.is_valid(i).then(|| max_hr.value(i)),
609 avg_speed: avg_speed.is_valid(i).then(|| avg_speed.value(i)),
610 max_speed: max_speed.is_valid(i).then(|| max_speed.value(i)),
611 elevation_gain: elevation_gain.is_valid(i).then(|| elevation_gain.value(i)),
612 elevation_loss: elevation_loss.is_valid(i).then(|| elevation_loss.value(i)),
613 avg_cadence: avg_cadence.is_valid(i).then(|| avg_cadence.value(i)),
614 avg_power: avg_power.is_valid(i).then(|| avg_power.value(i)),
615 normalized_power: normalized_power
616 .is_valid(i)
617 .then(|| normalized_power.value(i)),
618 training_effect: training_effect
619 .is_valid(i)
620 .then(|| training_effect.value(i)),
621 training_load: training_load.is_valid(i).then(|| training_load.value(i)),
622 start_lat: start_lat.is_valid(i).then(|| start_lat.value(i)),
623 start_lon: start_lon.is_valid(i).then(|| start_lon.value(i)),
624 end_lat: end_lat.is_valid(i).then(|| end_lat.value(i)),
625 end_lon: end_lon.is_valid(i).then(|| end_lon.value(i)),
626 ground_contact_time: ground_contact_time
627 .is_valid(i)
628 .then(|| ground_contact_time.value(i)),
629 vertical_oscillation: vertical_oscillation
630 .is_valid(i)
631 .then(|| vertical_oscillation.value(i)),
632 stride_length: stride_length.is_valid(i).then(|| stride_length.value(i)),
633 location_name: location_name
634 .is_valid(i)
635 .then(|| location_name.value(i).to_string()),
636 raw_json: raw_json
637 .is_valid(i)
638 .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
639 });
640 }
641
642 Ok(activities)
643 }
644
645 pub fn upsert_daily_health(&self, records: &[DailyHealth]) -> Result<()> {
651 self.ensure_dir(EntityType::DailyHealth)?;
652
653 let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
655 std::collections::HashMap::new();
656
657 for record in records {
658 let key = EntityType::DailyHealth.partition_key(record.date);
659 partitions.entry(key).or_default().push(record.clone());
660 }
661
662 for (key, mut new_records) in partitions {
664 let path = self.partition_path(EntityType::DailyHealth, &key);
665
666 let mut existing = self.read_daily_health_from_path(&path)?;
668
669 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
671 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
672
673 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
675
676 existing.append(&mut new_records);
678
679 existing.sort_by_key(|r| (r.profile_id, r.date));
681
682 let refs: Vec<&DailyHealth> = existing.iter().collect();
684 let batch = Self::daily_health_to_batch(refs)?;
685 self.write_batch(&path, &batch)?;
686 }
687
688 Ok(())
689 }
690
691 pub async fn upsert_daily_health_async(&self, records: &[DailyHealth]) -> Result<()> {
693 self.ensure_dir(EntityType::DailyHealth)?;
694
695 let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
697 std::collections::HashMap::new();
698
699 for record in records {
700 let key = EntityType::DailyHealth.partition_key(record.date);
701 partitions.entry(key).or_default().push(record.clone());
702 }
703
704 for (key, mut new_records) in partitions {
706 let lock = self.get_partition_lock(&key);
708 let _guard = lock.lock().await;
709
710 let path = self.partition_path(EntityType::DailyHealth, &key);
711
712 let mut existing = self.read_daily_health_from_path(&path)?;
714
715 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
717 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
718
719 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
721
722 existing.append(&mut new_records);
724
725 existing.sort_by_key(|r| (r.profile_id, r.date));
727
728 let refs: Vec<&DailyHealth> = existing.iter().collect();
730 let batch = Self::daily_health_to_batch(refs)?;
731 self.write_batch(&path, &batch)?;
732 }
733
734 Ok(())
735 }
736
737 fn read_daily_health_from_path(&self, path: &Path) -> Result<Vec<DailyHealth>> {
738 let batches = self.read_batches(path)?;
739 let mut records = Vec::new();
740
741 for batch in batches {
742 records.extend(Self::batch_to_daily_health(&batch)?);
743 }
744
745 Ok(records)
746 }
747
748 fn daily_health_to_batch(records: Vec<&DailyHealth>) -> Result<RecordBatch> {
749 let id: Int64Array = records.iter().map(|r| r.id).collect();
750 let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
751 let date: Date32Array = records
752 .iter()
753 .map(|r| {
754 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
755 Some((r.date - epoch).num_days() as i32)
756 })
757 .collect();
758 let steps: Int32Array = records.iter().map(|r| r.steps).collect();
759 let step_goal: Int32Array = records.iter().map(|r| r.step_goal).collect();
760 let total_calories: Int32Array = records.iter().map(|r| r.total_calories).collect();
761 let active_calories: Int32Array = records.iter().map(|r| r.active_calories).collect();
762 let bmr_calories: Int32Array = records.iter().map(|r| r.bmr_calories).collect();
763 let resting_hr: Int32Array = records.iter().map(|r| r.resting_hr).collect();
764 let sleep_seconds: Int32Array = records.iter().map(|r| r.sleep_seconds).collect();
765 let deep_sleep_seconds: Int32Array = records.iter().map(|r| r.deep_sleep_seconds).collect();
766 let light_sleep_seconds: Int32Array =
767 records.iter().map(|r| r.light_sleep_seconds).collect();
768 let rem_sleep_seconds: Int32Array = records.iter().map(|r| r.rem_sleep_seconds).collect();
769 let sleep_score: Int32Array = records.iter().map(|r| r.sleep_score).collect();
770 let avg_stress: Int32Array = records.iter().map(|r| r.avg_stress).collect();
771 let max_stress: Int32Array = records.iter().map(|r| r.max_stress).collect();
772 let body_battery_start: Int32Array = records.iter().map(|r| r.body_battery_start).collect();
773 let body_battery_end: Int32Array = records.iter().map(|r| r.body_battery_end).collect();
774 let hrv_weekly_avg: Int32Array = records.iter().map(|r| r.hrv_weekly_avg).collect();
775 let hrv_last_night: Int32Array = records.iter().map(|r| r.hrv_last_night).collect();
776 let hrv_status: StringArray = records.iter().map(|r| r.hrv_status.as_deref()).collect();
777 let avg_respiration: Float64Array = records.iter().map(|r| r.avg_respiration).collect();
778 let avg_spo2: Int32Array = records.iter().map(|r| r.avg_spo2).collect();
779 let lowest_spo2: Int32Array = records.iter().map(|r| r.lowest_spo2).collect();
780 let hydration_ml: Int32Array = records.iter().map(|r| r.hydration_ml).collect();
781 let moderate_intensity_min: Int32Array =
782 records.iter().map(|r| r.moderate_intensity_min).collect();
783 let vigorous_intensity_min: Int32Array =
784 records.iter().map(|r| r.vigorous_intensity_min).collect();
785 let raw_json: StringArray = records
786 .iter()
787 .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
788 .collect();
789 let sleep_note: StringArray = records.iter().map(|r| r.sleep_note.as_deref()).collect();
790
791 let schema = Arc::new(Schema::new(vec![
792 Field::new("id", DataType::Int64, true),
793 Field::new("profile_id", DataType::Int32, false),
794 Field::new("date", DataType::Date32, false),
795 Field::new("steps", DataType::Int32, true),
796 Field::new("step_goal", DataType::Int32, true),
797 Field::new("total_calories", DataType::Int32, true),
798 Field::new("active_calories", DataType::Int32, true),
799 Field::new("bmr_calories", DataType::Int32, true),
800 Field::new("resting_hr", DataType::Int32, true),
801 Field::new("sleep_seconds", DataType::Int32, true),
802 Field::new("deep_sleep_seconds", DataType::Int32, true),
803 Field::new("light_sleep_seconds", DataType::Int32, true),
804 Field::new("rem_sleep_seconds", DataType::Int32, true),
805 Field::new("sleep_score", DataType::Int32, true),
806 Field::new("avg_stress", DataType::Int32, true),
807 Field::new("max_stress", DataType::Int32, true),
808 Field::new("body_battery_start", DataType::Int32, true),
809 Field::new("body_battery_end", DataType::Int32, true),
810 Field::new("hrv_weekly_avg", DataType::Int32, true),
811 Field::new("hrv_last_night", DataType::Int32, true),
812 Field::new("hrv_status", DataType::Utf8, true),
813 Field::new("avg_respiration", DataType::Float64, true),
814 Field::new("avg_spo2", DataType::Int32, true),
815 Field::new("lowest_spo2", DataType::Int32, true),
816 Field::new("hydration_ml", DataType::Int32, true),
817 Field::new("moderate_intensity_min", DataType::Int32, true),
818 Field::new("vigorous_intensity_min", DataType::Int32, true),
819 Field::new("raw_json", DataType::Utf8, true),
820 Field::new("sleep_note", DataType::Utf8, true),
821 ]));
822
823 RecordBatch::try_new(
824 schema,
825 vec![
826 Arc::new(id),
827 Arc::new(profile_id),
828 Arc::new(date),
829 Arc::new(steps),
830 Arc::new(step_goal),
831 Arc::new(total_calories),
832 Arc::new(active_calories),
833 Arc::new(bmr_calories),
834 Arc::new(resting_hr),
835 Arc::new(sleep_seconds),
836 Arc::new(deep_sleep_seconds),
837 Arc::new(light_sleep_seconds),
838 Arc::new(rem_sleep_seconds),
839 Arc::new(sleep_score),
840 Arc::new(avg_stress),
841 Arc::new(max_stress),
842 Arc::new(body_battery_start),
843 Arc::new(body_battery_end),
844 Arc::new(hrv_weekly_avg),
845 Arc::new(hrv_last_night),
846 Arc::new(hrv_status),
847 Arc::new(avg_respiration),
848 Arc::new(avg_spo2),
849 Arc::new(lowest_spo2),
850 Arc::new(hydration_ml),
851 Arc::new(moderate_intensity_min),
852 Arc::new(vigorous_intensity_min),
853 Arc::new(raw_json),
854 Arc::new(sleep_note),
855 ],
856 )
857 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
858 }
859
860 fn batch_to_daily_health(batch: &RecordBatch) -> Result<Vec<DailyHealth>> {
861 let len = batch.num_rows();
862 let mut records = Vec::with_capacity(len);
863
864 let id = batch
865 .column(0)
866 .as_any()
867 .downcast_ref::<Int64Array>()
868 .unwrap();
869 let profile_id = batch
870 .column(1)
871 .as_any()
872 .downcast_ref::<Int32Array>()
873 .unwrap();
874 let date = batch
875 .column(2)
876 .as_any()
877 .downcast_ref::<Date32Array>()
878 .unwrap();
879 let steps = batch
880 .column(3)
881 .as_any()
882 .downcast_ref::<Int32Array>()
883 .unwrap();
884 let step_goal = batch
885 .column(4)
886 .as_any()
887 .downcast_ref::<Int32Array>()
888 .unwrap();
889 let total_calories = batch
890 .column(5)
891 .as_any()
892 .downcast_ref::<Int32Array>()
893 .unwrap();
894 let active_calories = batch
895 .column(6)
896 .as_any()
897 .downcast_ref::<Int32Array>()
898 .unwrap();
899 let bmr_calories = batch
900 .column(7)
901 .as_any()
902 .downcast_ref::<Int32Array>()
903 .unwrap();
904 let resting_hr = batch
905 .column(8)
906 .as_any()
907 .downcast_ref::<Int32Array>()
908 .unwrap();
909 let sleep_seconds = batch
910 .column(9)
911 .as_any()
912 .downcast_ref::<Int32Array>()
913 .unwrap();
914 let deep_sleep_seconds = batch
915 .column(10)
916 .as_any()
917 .downcast_ref::<Int32Array>()
918 .unwrap();
919 let light_sleep_seconds = batch
920 .column(11)
921 .as_any()
922 .downcast_ref::<Int32Array>()
923 .unwrap();
924 let rem_sleep_seconds = batch
925 .column(12)
926 .as_any()
927 .downcast_ref::<Int32Array>()
928 .unwrap();
929 let sleep_score = batch
930 .column(13)
931 .as_any()
932 .downcast_ref::<Int32Array>()
933 .unwrap();
934 let avg_stress = batch
935 .column(14)
936 .as_any()
937 .downcast_ref::<Int32Array>()
938 .unwrap();
939 let max_stress = batch
940 .column(15)
941 .as_any()
942 .downcast_ref::<Int32Array>()
943 .unwrap();
944 let body_battery_start = batch
945 .column(16)
946 .as_any()
947 .downcast_ref::<Int32Array>()
948 .unwrap();
949 let body_battery_end = batch
950 .column(17)
951 .as_any()
952 .downcast_ref::<Int32Array>()
953 .unwrap();
954 let hrv_weekly_avg = batch
955 .column(18)
956 .as_any()
957 .downcast_ref::<Int32Array>()
958 .unwrap();
959 let hrv_last_night = batch
960 .column(19)
961 .as_any()
962 .downcast_ref::<Int32Array>()
963 .unwrap();
964 let hrv_status = batch
965 .column(20)
966 .as_any()
967 .downcast_ref::<StringArray>()
968 .unwrap();
969 let avg_respiration = batch
970 .column(21)
971 .as_any()
972 .downcast_ref::<Float64Array>()
973 .unwrap();
974 let avg_spo2 = batch
975 .column(22)
976 .as_any()
977 .downcast_ref::<Int32Array>()
978 .unwrap();
979 let lowest_spo2 = batch
980 .column(23)
981 .as_any()
982 .downcast_ref::<Int32Array>()
983 .unwrap();
984 let hydration_ml = batch
985 .column(24)
986 .as_any()
987 .downcast_ref::<Int32Array>()
988 .unwrap();
989 let moderate_intensity_min = batch
990 .column(25)
991 .as_any()
992 .downcast_ref::<Int32Array>()
993 .unwrap();
994 let vigorous_intensity_min = batch
995 .column(26)
996 .as_any()
997 .downcast_ref::<Int32Array>()
998 .unwrap();
999 let raw_json = batch
1000 .column(27)
1001 .as_any()
1002 .downcast_ref::<StringArray>()
1003 .unwrap();
1004 let sleep_note = batch
1007 .column_by_name("sleep_note")
1008 .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1009
1010 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1011
1012 for i in 0..len {
1013 records.push(DailyHealth {
1014 id: id.is_valid(i).then(|| id.value(i)),
1015 profile_id: profile_id.value(i),
1016 date: epoch + chrono::Duration::days(date.value(i) as i64),
1017 steps: steps.is_valid(i).then(|| steps.value(i)),
1018 step_goal: step_goal.is_valid(i).then(|| step_goal.value(i)),
1019 total_calories: total_calories.is_valid(i).then(|| total_calories.value(i)),
1020 active_calories: active_calories
1021 .is_valid(i)
1022 .then(|| active_calories.value(i)),
1023 bmr_calories: bmr_calories.is_valid(i).then(|| bmr_calories.value(i)),
1024 resting_hr: resting_hr.is_valid(i).then(|| resting_hr.value(i)),
1025 sleep_seconds: sleep_seconds.is_valid(i).then(|| sleep_seconds.value(i)),
1026 deep_sleep_seconds: deep_sleep_seconds
1027 .is_valid(i)
1028 .then(|| deep_sleep_seconds.value(i)),
1029 light_sleep_seconds: light_sleep_seconds
1030 .is_valid(i)
1031 .then(|| light_sleep_seconds.value(i)),
1032 rem_sleep_seconds: rem_sleep_seconds
1033 .is_valid(i)
1034 .then(|| rem_sleep_seconds.value(i)),
1035 sleep_score: sleep_score.is_valid(i).then(|| sleep_score.value(i)),
1036 sleep_note: sleep_note
1037 .and_then(|arr| arr.is_valid(i).then(|| arr.value(i).to_string())),
1038 avg_stress: avg_stress.is_valid(i).then(|| avg_stress.value(i)),
1039 max_stress: max_stress.is_valid(i).then(|| max_stress.value(i)),
1040 body_battery_start: body_battery_start
1041 .is_valid(i)
1042 .then(|| body_battery_start.value(i)),
1043 body_battery_end: body_battery_end
1044 .is_valid(i)
1045 .then(|| body_battery_end.value(i)),
1046 hrv_weekly_avg: hrv_weekly_avg.is_valid(i).then(|| hrv_weekly_avg.value(i)),
1047 hrv_last_night: hrv_last_night.is_valid(i).then(|| hrv_last_night.value(i)),
1048 hrv_status: hrv_status
1049 .is_valid(i)
1050 .then(|| hrv_status.value(i).to_string()),
1051 avg_respiration: avg_respiration
1052 .is_valid(i)
1053 .then(|| avg_respiration.value(i)),
1054 avg_spo2: avg_spo2.is_valid(i).then(|| avg_spo2.value(i)),
1055 lowest_spo2: lowest_spo2.is_valid(i).then(|| lowest_spo2.value(i)),
1056 hydration_ml: hydration_ml.is_valid(i).then(|| hydration_ml.value(i)),
1057 moderate_intensity_min: moderate_intensity_min
1058 .is_valid(i)
1059 .then(|| moderate_intensity_min.value(i)),
1060 vigorous_intensity_min: vigorous_intensity_min
1061 .is_valid(i)
1062 .then(|| vigorous_intensity_min.value(i)),
1063 raw_json: raw_json
1064 .is_valid(i)
1065 .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1066 });
1067 }
1068
1069 Ok(records)
1070 }
1071
1072 pub fn write_track_points(
1078 &self,
1079 activity_date: NaiveDate,
1080 points: &[TrackPoint],
1081 ) -> Result<()> {
1082 self.ensure_dir(EntityType::TrackPoints)?;
1083
1084 let key = EntityType::TrackPoints.partition_key(activity_date);
1085 let path = self.partition_path(EntityType::TrackPoints, &key);
1086
1087 let mut existing = self.read_track_points_from_path(&path)?;
1089
1090 if let Some(first) = points.first() {
1091 existing.retain(|p| p.activity_id != first.activity_id);
1092 }
1093
1094 existing.extend(points.iter().cloned());
1095
1096 existing.sort_by(|a, b| {
1098 a.activity_id
1099 .cmp(&b.activity_id)
1100 .then(a.timestamp.cmp(&b.timestamp))
1101 });
1102
1103 let refs: Vec<&TrackPoint> = existing.iter().collect();
1104 let batch = Self::track_points_to_batch(refs)?;
1105 self.write_batch(&path, &batch)?;
1106
1107 Ok(())
1108 }
1109
1110 pub async fn write_track_points_async(
1112 &self,
1113 activity_date: NaiveDate,
1114 points: &[TrackPoint],
1115 ) -> Result<()> {
1116 self.ensure_dir(EntityType::TrackPoints)?;
1117
1118 let key = EntityType::TrackPoints.partition_key(activity_date);
1119
1120 let lock = self.get_partition_lock(&key);
1122 let _guard = lock.lock().await;
1123
1124 let path = self.partition_path(EntityType::TrackPoints, &key);
1125
1126 let mut existing = self.read_track_points_from_path(&path)?;
1128
1129 if let Some(first) = points.first() {
1130 existing.retain(|p| p.activity_id != first.activity_id);
1131 }
1132
1133 existing.extend(points.iter().cloned());
1134
1135 existing.sort_by(|a, b| {
1137 a.activity_id
1138 .cmp(&b.activity_id)
1139 .then(a.timestamp.cmp(&b.timestamp))
1140 });
1141
1142 let refs: Vec<&TrackPoint> = existing.iter().collect();
1143 let batch = Self::track_points_to_batch(refs)?;
1144 self.write_batch(&path, &batch)?;
1145
1146 Ok(())
1147 }
1148
1149 fn read_track_points_from_path(&self, path: &Path) -> Result<Vec<TrackPoint>> {
1150 let batches = self.read_batches(path)?;
1151 let mut points = Vec::new();
1152
1153 for batch in batches {
1154 points.extend(Self::batch_to_track_points(&batch)?);
1155 }
1156
1157 Ok(points)
1158 }
1159
1160 fn track_points_to_batch(points: Vec<&TrackPoint>) -> Result<RecordBatch> {
1161 let id: Int64Array = points.iter().map(|p| p.id).collect();
1162 let activity_id: Int64Array = points.iter().map(|p| p.activity_id).collect();
1163 let timestamp: TimestampMicrosecondArray = points
1164 .iter()
1165 .map(|p| Some(p.timestamp.timestamp_micros()))
1166 .collect();
1167 let lat: Float64Array = points.iter().map(|p| p.lat).collect();
1168 let lon: Float64Array = points.iter().map(|p| p.lon).collect();
1169 let elevation: Float64Array = points.iter().map(|p| p.elevation).collect();
1170 let heart_rate: Int32Array = points.iter().map(|p| p.heart_rate).collect();
1171 let cadence: Int32Array = points.iter().map(|p| p.cadence).collect();
1172 let power: Int32Array = points.iter().map(|p| p.power).collect();
1173 let speed: Float64Array = points.iter().map(|p| p.speed).collect();
1174
1175 let schema = Arc::new(Schema::new(vec![
1176 Field::new("id", DataType::Int64, true),
1177 Field::new("activity_id", DataType::Int64, false),
1178 Field::new(
1179 "timestamp",
1180 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1181 false,
1182 ),
1183 Field::new("lat", DataType::Float64, true),
1184 Field::new("lon", DataType::Float64, true),
1185 Field::new("elevation", DataType::Float64, true),
1186 Field::new("heart_rate", DataType::Int32, true),
1187 Field::new("cadence", DataType::Int32, true),
1188 Field::new("power", DataType::Int32, true),
1189 Field::new("speed", DataType::Float64, true),
1190 ]));
1191
1192 RecordBatch::try_new(
1193 schema,
1194 vec![
1195 Arc::new(id),
1196 Arc::new(activity_id),
1197 Arc::new(timestamp),
1198 Arc::new(lat),
1199 Arc::new(lon),
1200 Arc::new(elevation),
1201 Arc::new(heart_rate),
1202 Arc::new(cadence),
1203 Arc::new(power),
1204 Arc::new(speed),
1205 ],
1206 )
1207 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1208 }
1209
1210 fn batch_to_track_points(batch: &RecordBatch) -> Result<Vec<TrackPoint>> {
1211 let len = batch.num_rows();
1212 let mut points = Vec::with_capacity(len);
1213
1214 let id = batch
1215 .column(0)
1216 .as_any()
1217 .downcast_ref::<Int64Array>()
1218 .unwrap();
1219 let activity_id = batch
1220 .column(1)
1221 .as_any()
1222 .downcast_ref::<Int64Array>()
1223 .unwrap();
1224 let timestamp = batch
1225 .column(2)
1226 .as_any()
1227 .downcast_ref::<TimestampMicrosecondArray>()
1228 .unwrap();
1229 let lat = batch
1230 .column(3)
1231 .as_any()
1232 .downcast_ref::<Float64Array>()
1233 .unwrap();
1234 let lon = batch
1235 .column(4)
1236 .as_any()
1237 .downcast_ref::<Float64Array>()
1238 .unwrap();
1239 let elevation = batch
1240 .column(5)
1241 .as_any()
1242 .downcast_ref::<Float64Array>()
1243 .unwrap();
1244 let heart_rate = batch
1245 .column(6)
1246 .as_any()
1247 .downcast_ref::<Int32Array>()
1248 .unwrap();
1249 let cadence = batch
1250 .column(7)
1251 .as_any()
1252 .downcast_ref::<Int32Array>()
1253 .unwrap();
1254 let power = batch
1255 .column(8)
1256 .as_any()
1257 .downcast_ref::<Int32Array>()
1258 .unwrap();
1259 let speed = batch
1260 .column(9)
1261 .as_any()
1262 .downcast_ref::<Float64Array>()
1263 .unwrap();
1264
1265 for i in 0..len {
1266 points.push(TrackPoint {
1267 id: id.is_valid(i).then(|| id.value(i)),
1268 activity_id: activity_id.value(i),
1269 timestamp: DateTime::from_timestamp_micros(timestamp.value(i)).unwrap_or_default(),
1270 lat: lat.is_valid(i).then(|| lat.value(i)),
1271 lon: lon.is_valid(i).then(|| lon.value(i)),
1272 elevation: elevation.is_valid(i).then(|| elevation.value(i)),
1273 heart_rate: heart_rate.is_valid(i).then(|| heart_rate.value(i)),
1274 cadence: cadence.is_valid(i).then(|| cadence.value(i)),
1275 power: power.is_valid(i).then(|| power.value(i)),
1276 speed: speed.is_valid(i).then(|| speed.value(i)),
1277 });
1278 }
1279
1280 Ok(points)
1281 }
1282
1283 pub fn upsert_performance_metrics(&self, records: &[PerformanceMetrics]) -> Result<()> {
1289 self.ensure_dir(EntityType::PerformanceMetrics)?;
1290
1291 let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1293 std::collections::HashMap::new();
1294
1295 for record in records {
1296 let key = EntityType::PerformanceMetrics.partition_key(record.date);
1297 partitions.entry(key).or_default().push(record.clone());
1298 }
1299
1300 for (key, mut new_records) in partitions {
1302 let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1303
1304 let mut existing = self.read_performance_metrics_from_path(&path)?;
1306
1307 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1309 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1310
1311 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1313
1314 existing.append(&mut new_records);
1316
1317 existing.sort_by_key(|r| (r.profile_id, r.date));
1319
1320 let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1322 let batch = Self::performance_metrics_to_batch(refs)?;
1323 self.write_batch(&path, &batch)?;
1324 }
1325
1326 Ok(())
1327 }
1328
1329 pub async fn upsert_performance_metrics_async(
1331 &self,
1332 records: &[PerformanceMetrics],
1333 ) -> Result<()> {
1334 self.ensure_dir(EntityType::PerformanceMetrics)?;
1335
1336 let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1338 std::collections::HashMap::new();
1339
1340 for record in records {
1341 let key = EntityType::PerformanceMetrics.partition_key(record.date);
1342 partitions.entry(key).or_default().push(record.clone());
1343 }
1344
1345 for (key, mut new_records) in partitions {
1347 let lock = self.get_partition_lock(&key);
1349 let _guard = lock.lock().await;
1350
1351 let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1352
1353 let mut existing = self.read_performance_metrics_from_path(&path)?;
1355
1356 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1358 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1359
1360 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1362
1363 existing.append(&mut new_records);
1365
1366 existing.sort_by_key(|r| (r.profile_id, r.date));
1368
1369 let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1371 let batch = Self::performance_metrics_to_batch(refs)?;
1372 self.write_batch(&path, &batch)?;
1373 }
1374
1375 Ok(())
1376 }
1377
1378 fn read_performance_metrics_from_path(&self, path: &Path) -> Result<Vec<PerformanceMetrics>> {
1379 let batches = self.read_batches(path)?;
1380 let mut records = Vec::new();
1381
1382 for batch in batches {
1383 records.extend(Self::batch_to_performance_metrics(&batch)?);
1384 }
1385
1386 Ok(records)
1387 }
1388
1389 fn performance_metrics_to_batch(records: Vec<&PerformanceMetrics>) -> Result<RecordBatch> {
1390 let id: Int64Array = records.iter().map(|r| r.id).collect();
1391 let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1392 let date: Date32Array = records
1393 .iter()
1394 .map(|r| {
1395 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1396 Some((r.date - epoch).num_days() as i32)
1397 })
1398 .collect();
1399 let vo2max: Float64Array = records.iter().map(|r| r.vo2max).collect();
1400 let fitness_age: Int32Array = records.iter().map(|r| r.fitness_age).collect();
1401 let training_readiness: Int32Array = records.iter().map(|r| r.training_readiness).collect();
1402 let training_status: StringArray = records
1403 .iter()
1404 .map(|r| r.training_status.as_deref())
1405 .collect();
1406 let lactate_threshold_hr: Int32Array =
1407 records.iter().map(|r| r.lactate_threshold_hr).collect();
1408 let lactate_threshold_pace: Float64Array =
1409 records.iter().map(|r| r.lactate_threshold_pace).collect();
1410 let race_5k_sec: Int32Array = records.iter().map(|r| r.race_5k_sec).collect();
1411 let race_10k_sec: Int32Array = records.iter().map(|r| r.race_10k_sec).collect();
1412 let race_half_sec: Int32Array = records.iter().map(|r| r.race_half_sec).collect();
1413 let race_marathon_sec: Int32Array = records.iter().map(|r| r.race_marathon_sec).collect();
1414 let endurance_score: Int32Array = records.iter().map(|r| r.endurance_score).collect();
1415 let hill_score: Int32Array = records.iter().map(|r| r.hill_score).collect();
1416 let raw_json: StringArray = records
1417 .iter()
1418 .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
1419 .collect();
1420
1421 let schema = Arc::new(Schema::new(vec![
1422 Field::new("id", DataType::Int64, true),
1423 Field::new("profile_id", DataType::Int32, false),
1424 Field::new("date", DataType::Date32, false),
1425 Field::new("vo2max", DataType::Float64, true),
1426 Field::new("fitness_age", DataType::Int32, true),
1427 Field::new("training_readiness", DataType::Int32, true),
1428 Field::new("training_status", DataType::Utf8, true),
1429 Field::new("lactate_threshold_hr", DataType::Int32, true),
1430 Field::new("lactate_threshold_pace", DataType::Float64, true),
1431 Field::new("race_5k_sec", DataType::Int32, true),
1432 Field::new("race_10k_sec", DataType::Int32, true),
1433 Field::new("race_half_sec", DataType::Int32, true),
1434 Field::new("race_marathon_sec", DataType::Int32, true),
1435 Field::new("endurance_score", DataType::Int32, true),
1436 Field::new("hill_score", DataType::Int32, true),
1437 Field::new("raw_json", DataType::Utf8, true),
1438 ]));
1439
1440 RecordBatch::try_new(
1441 schema,
1442 vec![
1443 Arc::new(id),
1444 Arc::new(profile_id),
1445 Arc::new(date),
1446 Arc::new(vo2max),
1447 Arc::new(fitness_age),
1448 Arc::new(training_readiness),
1449 Arc::new(training_status),
1450 Arc::new(lactate_threshold_hr),
1451 Arc::new(lactate_threshold_pace),
1452 Arc::new(race_5k_sec),
1453 Arc::new(race_10k_sec),
1454 Arc::new(race_half_sec),
1455 Arc::new(race_marathon_sec),
1456 Arc::new(endurance_score),
1457 Arc::new(hill_score),
1458 Arc::new(raw_json),
1459 ],
1460 )
1461 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1462 }
1463
1464 fn batch_to_performance_metrics(batch: &RecordBatch) -> Result<Vec<PerformanceMetrics>> {
1465 let len = batch.num_rows();
1466 let mut records = Vec::with_capacity(len);
1467
1468 let id = batch
1469 .column(0)
1470 .as_any()
1471 .downcast_ref::<Int64Array>()
1472 .unwrap();
1473 let profile_id = batch
1474 .column(1)
1475 .as_any()
1476 .downcast_ref::<Int32Array>()
1477 .unwrap();
1478 let date = batch
1479 .column(2)
1480 .as_any()
1481 .downcast_ref::<Date32Array>()
1482 .unwrap();
1483 let vo2max = batch
1484 .column(3)
1485 .as_any()
1486 .downcast_ref::<Float64Array>()
1487 .unwrap();
1488 let fitness_age = batch
1489 .column(4)
1490 .as_any()
1491 .downcast_ref::<Int32Array>()
1492 .unwrap();
1493 let training_readiness = batch
1494 .column(5)
1495 .as_any()
1496 .downcast_ref::<Int32Array>()
1497 .unwrap();
1498 let training_status = batch
1499 .column(6)
1500 .as_any()
1501 .downcast_ref::<StringArray>()
1502 .unwrap();
1503 let lactate_threshold_hr = batch
1504 .column(7)
1505 .as_any()
1506 .downcast_ref::<Int32Array>()
1507 .unwrap();
1508 let lactate_threshold_pace = batch
1509 .column(8)
1510 .as_any()
1511 .downcast_ref::<Float64Array>()
1512 .unwrap();
1513 let race_5k_sec = batch
1514 .column(9)
1515 .as_any()
1516 .downcast_ref::<Int32Array>()
1517 .unwrap();
1518 let race_10k_sec = batch
1519 .column(10)
1520 .as_any()
1521 .downcast_ref::<Int32Array>()
1522 .unwrap();
1523 let race_half_sec = batch
1524 .column(11)
1525 .as_any()
1526 .downcast_ref::<Int32Array>()
1527 .unwrap();
1528 let race_marathon_sec = batch
1529 .column(12)
1530 .as_any()
1531 .downcast_ref::<Int32Array>()
1532 .unwrap();
1533 let endurance_score = batch
1534 .column(13)
1535 .as_any()
1536 .downcast_ref::<Int32Array>()
1537 .unwrap();
1538 let hill_score = batch
1539 .column(14)
1540 .as_any()
1541 .downcast_ref::<Int32Array>()
1542 .unwrap();
1543 let raw_json = batch
1544 .column(15)
1545 .as_any()
1546 .downcast_ref::<StringArray>()
1547 .unwrap();
1548
1549 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1550
1551 for i in 0..len {
1552 records.push(PerformanceMetrics {
1553 id: id.is_valid(i).then(|| id.value(i)),
1554 profile_id: profile_id.value(i),
1555 date: epoch + chrono::Duration::days(date.value(i) as i64),
1556 vo2max: vo2max.is_valid(i).then(|| vo2max.value(i)),
1557 fitness_age: fitness_age.is_valid(i).then(|| fitness_age.value(i)),
1558 training_readiness: training_readiness
1559 .is_valid(i)
1560 .then(|| training_readiness.value(i)),
1561 training_status: training_status
1562 .is_valid(i)
1563 .then(|| training_status.value(i).to_string()),
1564 lactate_threshold_hr: lactate_threshold_hr
1565 .is_valid(i)
1566 .then(|| lactate_threshold_hr.value(i)),
1567 lactate_threshold_pace: lactate_threshold_pace
1568 .is_valid(i)
1569 .then(|| lactate_threshold_pace.value(i)),
1570 race_5k_sec: race_5k_sec.is_valid(i).then(|| race_5k_sec.value(i)),
1571 race_10k_sec: race_10k_sec.is_valid(i).then(|| race_10k_sec.value(i)),
1572 race_half_sec: race_half_sec.is_valid(i).then(|| race_half_sec.value(i)),
1573 race_marathon_sec: race_marathon_sec
1574 .is_valid(i)
1575 .then(|| race_marathon_sec.value(i)),
1576 endurance_score: endurance_score
1577 .is_valid(i)
1578 .then(|| endurance_score.value(i)),
1579 hill_score: hill_score.is_valid(i).then(|| hill_score.value(i)),
1580 raw_json: raw_json
1581 .is_valid(i)
1582 .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1583 });
1584 }
1585
1586 Ok(records)
1587 }
1588
1589 pub fn upsert_weight(&self, records: &[WeightEntry]) -> Result<()> {
1595 self.ensure_dir(EntityType::Weight)?;
1596
1597 let mut partitions: std::collections::HashMap<String, Vec<WeightEntry>> =
1599 std::collections::HashMap::new();
1600
1601 for record in records {
1602 let key = EntityType::Weight.partition_key(record.date);
1603 partitions.entry(key).or_default().push(record.clone());
1604 }
1605
1606 for (key, mut new_records) in partitions {
1608 let path = self.partition_path(EntityType::Weight, &key);
1609
1610 let mut existing = self.read_weight_from_path(&path)?;
1612
1613 let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1615 new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1616
1617 existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1619
1620 existing.append(&mut new_records);
1622
1623 existing.sort_by_key(|r| (r.profile_id, r.date));
1625
1626 let refs: Vec<&WeightEntry> = existing.iter().collect();
1628 let batch = Self::weight_to_batch(refs)?;
1629 self.write_batch(&path, &batch)?;
1630 }
1631
1632 Ok(())
1633 }
1634
1635 fn read_weight_from_path(&self, path: &Path) -> Result<Vec<WeightEntry>> {
1636 let batches = self.read_batches(path)?;
1637 let mut records = Vec::new();
1638
1639 for batch in batches {
1640 records.extend(Self::batch_to_weight(&batch)?);
1641 }
1642
1643 Ok(records)
1644 }
1645
1646 fn weight_to_batch(records: Vec<&WeightEntry>) -> Result<RecordBatch> {
1647 let id: Int64Array = records.iter().map(|r| r.id).collect();
1648 let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1649 let date: Date32Array = records
1650 .iter()
1651 .map(|r| {
1652 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1653 Some((r.date - epoch).num_days() as i32)
1654 })
1655 .collect();
1656 let weight_kg: Float64Array = records.iter().map(|r| r.weight_kg).collect();
1657 let bmi: Float64Array = records.iter().map(|r| r.bmi).collect();
1658 let body_fat_pct: Float64Array = records.iter().map(|r| r.body_fat_pct).collect();
1659 let muscle_mass_kg: Float64Array = records.iter().map(|r| r.muscle_mass_kg).collect();
1660
1661 let schema = Arc::new(Schema::new(vec![
1662 Field::new("id", DataType::Int64, true),
1663 Field::new("profile_id", DataType::Int32, false),
1664 Field::new("date", DataType::Date32, false),
1665 Field::new("weight_kg", DataType::Float64, true),
1666 Field::new("bmi", DataType::Float64, true),
1667 Field::new("body_fat_pct", DataType::Float64, true),
1668 Field::new("muscle_mass_kg", DataType::Float64, true),
1669 ]));
1670
1671 RecordBatch::try_new(
1672 schema,
1673 vec![
1674 Arc::new(id),
1675 Arc::new(profile_id),
1676 Arc::new(date),
1677 Arc::new(weight_kg),
1678 Arc::new(bmi),
1679 Arc::new(body_fat_pct),
1680 Arc::new(muscle_mass_kg),
1681 ],
1682 )
1683 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1684 }
1685
1686 fn batch_to_weight(batch: &RecordBatch) -> Result<Vec<WeightEntry>> {
1687 let len = batch.num_rows();
1688 let mut records = Vec::with_capacity(len);
1689
1690 let id = batch
1691 .column(0)
1692 .as_any()
1693 .downcast_ref::<Int64Array>()
1694 .unwrap();
1695 let profile_id = batch
1696 .column(1)
1697 .as_any()
1698 .downcast_ref::<Int32Array>()
1699 .unwrap();
1700 let date = batch
1701 .column(2)
1702 .as_any()
1703 .downcast_ref::<Date32Array>()
1704 .unwrap();
1705 let weight_kg = batch
1706 .column(3)
1707 .as_any()
1708 .downcast_ref::<Float64Array>()
1709 .unwrap();
1710 let bmi = batch
1711 .column(4)
1712 .as_any()
1713 .downcast_ref::<Float64Array>()
1714 .unwrap();
1715 let body_fat_pct = batch
1716 .column(5)
1717 .as_any()
1718 .downcast_ref::<Float64Array>()
1719 .unwrap();
1720 let muscle_mass_kg = batch
1721 .column(6)
1722 .as_any()
1723 .downcast_ref::<Float64Array>()
1724 .unwrap();
1725
1726 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1727
1728 for i in 0..len {
1729 records.push(WeightEntry {
1730 id: id.is_valid(i).then(|| id.value(i)),
1731 profile_id: profile_id.value(i),
1732 date: epoch + chrono::Duration::days(date.value(i) as i64),
1733 weight_kg: weight_kg.is_valid(i).then(|| weight_kg.value(i)),
1734 bmi: bmi.is_valid(i).then(|| bmi.value(i)),
1735 body_fat_pct: body_fat_pct.is_valid(i).then(|| body_fat_pct.value(i)),
1736 muscle_mass_kg: muscle_mass_kg.is_valid(i).then(|| muscle_mass_kg.value(i)),
1737 });
1738 }
1739
1740 Ok(records)
1741 }
1742
1743 pub fn write_profiles(&self, profiles: &[Profile]) -> Result<()> {
1749 fs::create_dir_all(&self.base_path)
1750 .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
1751
1752 let path = self.partition_path(EntityType::Profiles, "");
1753 let refs: Vec<&Profile> = profiles.iter().collect();
1754 let batch = Self::profiles_to_batch(refs)?;
1755 self.write_batch(&path, &batch)?;
1756
1757 Ok(())
1758 }
1759
1760 pub fn read_profiles(&self) -> Result<Vec<Profile>> {
1762 let path = self.partition_path(EntityType::Profiles, "");
1763 let batches = self.read_batches(&path)?;
1764 let mut profiles = Vec::new();
1765
1766 for batch in batches {
1767 profiles.extend(Self::batch_to_profiles(&batch)?);
1768 }
1769
1770 Ok(profiles)
1771 }
1772
1773 fn profiles_to_batch(profiles: Vec<&Profile>) -> Result<RecordBatch> {
1774 let profile_id: Int32Array = profiles.iter().map(|p| p.profile_id).collect();
1775 let display_name: StringArray = profiles
1776 .iter()
1777 .map(|p| Some(p.display_name.as_str()))
1778 .collect();
1779 let user_id: Int64Array = profiles.iter().map(|p| p.user_id).collect();
1780 let created_at: TimestampMicrosecondArray = profiles
1781 .iter()
1782 .map(|p| p.created_at.map(|t| t.timestamp_micros()))
1783 .collect();
1784 let last_sync_at: TimestampMicrosecondArray = profiles
1785 .iter()
1786 .map(|p| p.last_sync_at.map(|t| t.timestamp_micros()))
1787 .collect();
1788
1789 let schema = Arc::new(Schema::new(vec![
1790 Field::new("profile_id", DataType::Int32, false),
1791 Field::new("display_name", DataType::Utf8, false),
1792 Field::new("user_id", DataType::Int64, true),
1793 Field::new(
1794 "created_at",
1795 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1796 true,
1797 ),
1798 Field::new(
1799 "last_sync_at",
1800 DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1801 true,
1802 ),
1803 ]));
1804
1805 RecordBatch::try_new(
1806 schema,
1807 vec![
1808 Arc::new(profile_id),
1809 Arc::new(display_name),
1810 Arc::new(user_id),
1811 Arc::new(created_at),
1812 Arc::new(last_sync_at),
1813 ],
1814 )
1815 .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1816 }
1817
1818 fn batch_to_profiles(batch: &RecordBatch) -> Result<Vec<Profile>> {
1819 let len = batch.num_rows();
1820 let mut profiles = Vec::with_capacity(len);
1821
1822 let profile_id = batch
1823 .column(0)
1824 .as_any()
1825 .downcast_ref::<Int32Array>()
1826 .unwrap();
1827 let display_name = batch
1828 .column(1)
1829 .as_any()
1830 .downcast_ref::<StringArray>()
1831 .unwrap();
1832 let user_id = batch
1833 .column(2)
1834 .as_any()
1835 .downcast_ref::<Int64Array>()
1836 .unwrap();
1837 let created_at = batch
1838 .column(3)
1839 .as_any()
1840 .downcast_ref::<TimestampMicrosecondArray>()
1841 .unwrap();
1842 let last_sync_at = batch
1843 .column(4)
1844 .as_any()
1845 .downcast_ref::<TimestampMicrosecondArray>()
1846 .unwrap();
1847
1848 for i in 0..len {
1849 profiles.push(Profile {
1850 profile_id: profile_id.value(i),
1851 display_name: display_name.value(i).to_string(),
1852 user_id: user_id.is_valid(i).then(|| user_id.value(i)),
1853 created_at: created_at.is_valid(i).then(|| {
1854 DateTime::from_timestamp_micros(created_at.value(i)).unwrap_or_default()
1855 }),
1856 last_sync_at: last_sync_at.is_valid(i).then(|| {
1857 DateTime::from_timestamp_micros(last_sync_at.value(i)).unwrap_or_default()
1858 }),
1859 });
1860 }
1861
1862 Ok(profiles)
1863 }
1864
1865 pub fn base_path(&self) -> &Path {
1867 &self.base_path
1868 }
1869}
1870
1871#[cfg(test)]
1872mod tests {
1873 use super::*;
1874 use tempfile::TempDir;
1875
1876 #[test]
1877 fn test_activity_round_trip() {
1878 let temp = TempDir::new().unwrap();
1879 let store = ParquetStore::new(temp.path());
1880
1881 let activities = vec![Activity {
1882 activity_id: 123,
1883 profile_id: 1,
1884 activity_name: Some("Morning Run".to_string()),
1885 activity_type: Some("running".to_string()),
1886 start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1887 start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1888 duration_sec: Some(3600.0),
1889 distance_m: Some(10000.0),
1890 calories: Some(500),
1891 avg_hr: Some(150),
1892 max_hr: Some(175),
1893 avg_speed: Some(2.78),
1894 max_speed: Some(3.5),
1895 elevation_gain: Some(100.0),
1896 elevation_loss: Some(100.0),
1897 avg_cadence: Some(180.0),
1898 avg_power: None,
1899 normalized_power: None,
1900 training_effect: Some(3.5),
1901 training_load: Some(120.0),
1902 start_lat: Some(37.7749),
1903 start_lon: Some(-122.4194),
1904 end_lat: Some(37.7849),
1905 end_lon: Some(-122.4094),
1906 ground_contact_time: Some(250.0),
1907 vertical_oscillation: Some(8.5),
1908 stride_length: Some(1.2),
1909 location_name: Some("San Francisco".to_string()),
1910 raw_json: None,
1911 }];
1912
1913 store.upsert_activities(&activities).unwrap();
1914
1915 let key = EntityType::Activities
1917 .partition_key(activities[0].start_time_local.unwrap().date_naive());
1918 let path = store.partition_path(EntityType::Activities, &key);
1919 let read_back = store.read_activities_from_path(&path).unwrap();
1920
1921 assert_eq!(read_back.len(), 1);
1922 assert_eq!(read_back[0].activity_id, 123);
1923 assert_eq!(read_back[0].activity_name, Some("Morning Run".to_string()));
1924 }
1925
1926 #[test]
1927 fn test_daily_health_round_trip() {
1928 let temp = TempDir::new().unwrap();
1929 let store = ParquetStore::new(temp.path());
1930
1931 let records = vec![DailyHealth {
1932 id: None,
1933 profile_id: 1,
1934 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
1935 steps: Some(10000),
1936 step_goal: Some(8000),
1937 total_calories: Some(2500),
1938 active_calories: Some(500),
1939 bmr_calories: Some(2000),
1940 resting_hr: Some(55),
1941 sleep_seconds: Some(28800),
1942 deep_sleep_seconds: Some(7200),
1943 light_sleep_seconds: Some(14400),
1944 rem_sleep_seconds: Some(7200),
1945 sleep_score: Some(85),
1946 sleep_note: Some("Felt rested.".to_string()),
1947 avg_stress: Some(30),
1948 max_stress: Some(75),
1949 body_battery_start: Some(95),
1950 body_battery_end: Some(45),
1951 hrv_weekly_avg: Some(45),
1952 hrv_last_night: Some(48),
1953 hrv_status: Some("balanced".to_string()),
1954 avg_respiration: Some(14.5),
1955 avg_spo2: Some(96),
1956 lowest_spo2: Some(92),
1957 hydration_ml: Some(2500),
1958 moderate_intensity_min: Some(30),
1959 vigorous_intensity_min: Some(20),
1960 raw_json: None,
1961 }];
1962
1963 store.upsert_daily_health(&records).unwrap();
1964
1965 let key = EntityType::DailyHealth.partition_key(records[0].date);
1967 let path = store.partition_path(EntityType::DailyHealth, &key);
1968 let read_back = store.read_daily_health_from_path(&path).unwrap();
1969
1970 assert_eq!(read_back.len(), 1);
1971 assert_eq!(read_back[0].steps, Some(10000));
1972 assert_eq!(read_back[0].sleep_note.as_deref(), Some("Felt rested."));
1973 assert_eq!(
1974 read_back[0].date,
1975 NaiveDate::from_ymd_opt(2024, 12, 15).unwrap()
1976 );
1977 }
1978
1979 #[test]
1980 fn test_has_daily_health() {
1981 let temp = TempDir::new().unwrap();
1982 let store = ParquetStore::new(temp.path());
1983 let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
1984
1985 assert!(!store.has_daily_health(1, date).unwrap());
1986
1987 let records = vec![DailyHealth {
1988 id: None,
1989 profile_id: 1,
1990 date,
1991 steps: Some(10000),
1992 step_goal: None,
1993 total_calories: None,
1994 active_calories: None,
1995 bmr_calories: None,
1996 resting_hr: None,
1997 sleep_seconds: None,
1998 deep_sleep_seconds: None,
1999 light_sleep_seconds: None,
2000 rem_sleep_seconds: None,
2001 sleep_score: None,
2002 sleep_note: None,
2003 avg_stress: None,
2004 max_stress: None,
2005 body_battery_start: None,
2006 body_battery_end: None,
2007 hrv_weekly_avg: None,
2008 hrv_last_night: None,
2009 hrv_status: None,
2010 avg_respiration: None,
2011 avg_spo2: None,
2012 lowest_spo2: None,
2013 hydration_ml: None,
2014 moderate_intensity_min: None,
2015 vigorous_intensity_min: None,
2016 raw_json: None,
2017 }];
2018
2019 store.upsert_daily_health(&records).unwrap();
2020 assert!(store.has_daily_health(1, date).unwrap());
2021 }
2022
2023 #[test]
2024 fn test_has_track_points() {
2025 let temp = TempDir::new().unwrap();
2026 let store = ParquetStore::new(temp.path());
2027 let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
2028
2029 assert!(!store.has_track_points(42, date).unwrap());
2030
2031 let points = vec![TrackPoint {
2032 id: None,
2033 activity_id: 42,
2034 timestamp: DateTime::from_timestamp(1703001600, 0).unwrap(),
2035 lat: Some(1.0),
2036 lon: Some(2.0),
2037 elevation: Some(3.0),
2038 heart_rate: None,
2039 cadence: None,
2040 power: None,
2041 speed: None,
2042 }];
2043
2044 store.write_track_points(date, &points).unwrap();
2045 assert!(store.has_track_points(42, date).unwrap());
2046 }
2047
2048 #[test]
2049 fn test_concurrent_read_write() {
2050 use std::sync::{Arc, Barrier};
2051 use std::thread;
2052
2053 let temp = TempDir::new().unwrap();
2054 let store = Arc::new(ParquetStore::new(temp.path()));
2055
2056 let initial_activities = vec![Activity {
2058 activity_id: 1,
2059 profile_id: 1,
2060 activity_name: Some("Initial Activity".to_string()),
2061 activity_type: Some("running".to_string()),
2062 start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2063 start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2064 duration_sec: Some(3600.0),
2065 distance_m: Some(10000.0),
2066 calories: None,
2067 avg_hr: None,
2068 max_hr: None,
2069 avg_speed: None,
2070 max_speed: None,
2071 elevation_gain: None,
2072 elevation_loss: None,
2073 avg_cadence: None,
2074 avg_power: None,
2075 normalized_power: None,
2076 training_effect: None,
2077 training_load: None,
2078 start_lat: None,
2079 start_lon: None,
2080 end_lat: None,
2081 end_lon: None,
2082 ground_contact_time: None,
2083 vertical_oscillation: None,
2084 stride_length: None,
2085 location_name: None,
2086 raw_json: None,
2087 }];
2088 store.upsert_activities(&initial_activities).unwrap();
2089
2090 let barrier = Arc::new(Barrier::new(2));
2092 let store_reader = Arc::clone(&store);
2093 let barrier_reader = Arc::clone(&barrier);
2094
2095 let reader_handle = thread::spawn(move || {
2097 barrier_reader.wait(); let key = EntityType::Activities
2101 .partition_key(NaiveDate::from_ymd_opt(2023, 12, 19).unwrap());
2102 let path = store_reader.partition_path(EntityType::Activities, &key);
2103 let activities = store_reader.read_activities_from_path(&path).unwrap();
2104
2105 assert!(!activities.is_empty(), "Reader should see initial data");
2107 activities.len()
2108 });
2109
2110 let writer_handle = thread::spawn(move || {
2112 barrier.wait(); let new_activities = vec![Activity {
2116 activity_id: 2,
2117 profile_id: 1,
2118 activity_name: Some("Concurrent Activity".to_string()),
2119 activity_type: Some("cycling".to_string()),
2120 start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2122 duration_sec: Some(7200.0),
2123 distance_m: Some(50000.0),
2124 calories: None,
2125 avg_hr: None,
2126 max_hr: None,
2127 avg_speed: None,
2128 max_speed: None,
2129 elevation_gain: None,
2130 elevation_loss: None,
2131 avg_cadence: None,
2132 avg_power: None,
2133 normalized_power: None,
2134 training_effect: None,
2135 training_load: None,
2136 start_lat: None,
2137 start_lon: None,
2138 end_lat: None,
2139 end_lon: None,
2140 ground_contact_time: None,
2141 vertical_oscillation: None,
2142 stride_length: None,
2143 location_name: None,
2144 raw_json: None,
2145 }];
2146 store.upsert_activities(&new_activities).unwrap();
2147 2 });
2149
2150 let read_count = reader_handle.join().expect("Reader thread panicked");
2152 let written_id = writer_handle.join().expect("Writer thread panicked");
2153
2154 assert_eq!(read_count, 1, "Reader should have read 1 activity");
2155 assert_eq!(written_id, 2, "Writer should have written activity 2");
2156 }
2157
2158 #[test]
2159 fn test_duckdb_glob_query() {
2160 use duckdb::Connection;
2161
2162 let temp = TempDir::new().unwrap();
2163 let store = ParquetStore::new(temp.path());
2164
2165 let activities = vec![
2167 Activity {
2168 activity_id: 1,
2169 profile_id: 1,
2170 activity_name: Some("Week 51 Run".to_string()),
2171 activity_type: Some("running".to_string()),
2172 start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()), start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2174 duration_sec: Some(3600.0),
2175 distance_m: Some(10000.0),
2176 calories: None,
2177 avg_hr: None,
2178 max_hr: None,
2179 avg_speed: None,
2180 max_speed: None,
2181 elevation_gain: None,
2182 elevation_loss: None,
2183 avg_cadence: None,
2184 avg_power: None,
2185 normalized_power: None,
2186 training_effect: None,
2187 training_load: None,
2188 start_lat: None,
2189 start_lon: None,
2190 end_lat: None,
2191 end_lon: None,
2192 ground_contact_time: None,
2193 vertical_oscillation: None,
2194 stride_length: None,
2195 location_name: None,
2196 raw_json: None,
2197 },
2198 Activity {
2199 activity_id: 2,
2200 profile_id: 1,
2201 activity_name: Some("Week 52 Ride".to_string()),
2202 activity_type: Some("cycling".to_string()),
2203 start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2205 duration_sec: Some(7200.0),
2206 distance_m: Some(50000.0),
2207 calories: None,
2208 avg_hr: None,
2209 max_hr: None,
2210 avg_speed: None,
2211 max_speed: None,
2212 elevation_gain: None,
2213 elevation_loss: None,
2214 avg_cadence: None,
2215 avg_power: None,
2216 normalized_power: None,
2217 training_effect: None,
2218 training_load: None,
2219 start_lat: None,
2220 start_lon: None,
2221 end_lat: None,
2222 end_lon: None,
2223 ground_contact_time: None,
2224 vertical_oscillation: None,
2225 stride_length: None,
2226 location_name: None,
2227 raw_json: None,
2228 },
2229 ];
2230 store.upsert_activities(&activities).unwrap();
2231
2232 let conn = Connection::open_in_memory().unwrap();
2234 let glob_pattern = format!("{}/*.parquet", temp.path().join("activities").display());
2235
2236 let mut stmt = conn
2237 .prepare(&format!(
2238 "SELECT activity_id, activity_name FROM '{}' ORDER BY activity_id",
2239 glob_pattern
2240 ))
2241 .unwrap();
2242
2243 let results: Vec<(i64, String)> = stmt
2244 .query_map([], |row| {
2245 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
2246 })
2247 .unwrap()
2248 .filter_map(|r| r.ok())
2249 .collect();
2250
2251 assert_eq!(results.len(), 2);
2253 assert_eq!(results[0], (1, "Week 51 Run".to_string()));
2254 assert_eq!(results[1], (2, "Week 52 Ride".to_string()));
2255 }
2256}