Skip to main content

garmin_cli/storage/
parquet.rs

1//! Parquet read/write utilities for time-partitioned storage
2//!
3//! Uses Arrow record batches for efficient columnar storage.
4//! Supports concurrent writes to different partitions via partition-level locks.
5
6use std::fs::{self, File};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::*;
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow::record_batch::RecordBatch;
13use chrono::{DateTime, NaiveDate};
14use dashmap::DashMap;
15use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
16use parquet::arrow::ArrowWriter;
17use parquet::basic::Compression;
18use parquet::file::properties::WriterProperties;
19use tokio::sync::Mutex as TokioMutex;
20
21use crate::db::models::{
22    Activity, DailyHealth, PerformanceMetrics, Profile, TrackPoint, WeightEntry,
23};
24use crate::error::{GarminError, Result};
25
26use super::partitions::EntityType;
27
28/// Parquet storage for Garmin data
29///
30/// Supports concurrent writes to different partitions. Each partition has its own
31/// lock to prevent lost updates when multiple workers write to the same partition.
32#[derive(Clone)]
33pub struct ParquetStore {
34    base_path: PathBuf,
35    /// Per-partition locks for concurrent write safety
36    partition_locks: Arc<DashMap<String, Arc<TokioMutex<()>>>>,
37}
38
39impl ParquetStore {
40    /// Create a new ParquetStore at the given base path
41    pub fn new(base_path: impl Into<PathBuf>) -> Self {
42        Self {
43            base_path: base_path.into(),
44            partition_locks: Arc::new(DashMap::new()),
45        }
46    }
47
48    /// Get or create a lock for a specific partition
49    fn get_partition_lock(&self, partition_key: &str) -> Arc<TokioMutex<()>> {
50        self.partition_locks
51            .entry(partition_key.to_string())
52            .or_insert_with(|| Arc::new(TokioMutex::new(())))
53            .clone()
54    }
55
56    /// Check if daily health data exists for a profile/date
57    pub fn has_daily_health(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
58        let key = EntityType::DailyHealth.partition_key(date);
59        let path = self.partition_path(EntityType::DailyHealth, &key);
60        if !path.exists() {
61            return Ok(false);
62        }
63        let records = self.read_daily_health_from_path(&path)?;
64        Ok(records
65            .iter()
66            .any(|r| r.profile_id == profile_id && r.date == date))
67    }
68
69    /// Check if performance metrics exist for a profile/date
70    pub fn has_performance_metrics(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
71        let key = EntityType::PerformanceMetrics.partition_key(date);
72        let path = self.partition_path(EntityType::PerformanceMetrics, &key);
73        if !path.exists() {
74            return Ok(false);
75        }
76        let records = self.read_performance_metrics_from_path(&path)?;
77        Ok(records
78            .iter()
79            .any(|r| r.profile_id == profile_id && r.date == date))
80    }
81
82    /// Check if track points exist for an activity/date partition
83    pub fn has_track_points(&self, activity_id: i64, date: NaiveDate) -> Result<bool> {
84        let key = EntityType::TrackPoints.partition_key(date);
85        let path = self.partition_path(EntityType::TrackPoints, &key);
86        if !path.exists() {
87            return Ok(false);
88        }
89        let records = self.read_track_points_from_path(&path)?;
90        Ok(records.iter().any(|r| r.activity_id == activity_id))
91    }
92
93    /// Get the full path for a partition file
94    pub fn partition_path(&self, entity: EntityType, partition_key: &str) -> PathBuf {
95        match entity {
96            EntityType::Profiles => self.base_path.join("profiles.parquet"),
97            _ => self
98                .base_path
99                .join(entity.dir_name())
100                .join(format!("{}.parquet", partition_key)),
101        }
102    }
103
104    /// Ensure the directory for an entity exists
105    fn ensure_dir(&self, entity: EntityType) -> Result<()> {
106        if entity != EntityType::Profiles {
107            let dir = self.base_path.join(entity.dir_name());
108            fs::create_dir_all(&dir).map_err(|e| {
109                GarminError::Database(format!("Failed to create directory {:?}: {}", dir, e))
110            })?;
111        } else {
112            fs::create_dir_all(&self.base_path).map_err(|e| {
113                GarminError::Database(format!(
114                    "Failed to create directory {:?}: {}",
115                    self.base_path, e
116                ))
117            })?;
118        }
119        Ok(())
120    }
121
122    /// Write a record batch to a partition file atomically
123    fn write_batch(&self, path: &Path, batch: &RecordBatch) -> Result<()> {
124        // Write to temp file first
125        let temp_path = path.with_extension("parquet.tmp");
126
127        // Ensure parent directory exists
128        if let Some(parent) = path.parent() {
129            fs::create_dir_all(parent)
130                .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
131        }
132
133        let file = File::create(&temp_path)
134            .map_err(|e| GarminError::Database(format!("Failed to create temp file: {}", e)))?;
135
136        let props = WriterProperties::builder()
137            .set_compression(Compression::ZSTD(Default::default()))
138            .build();
139
140        let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
141            GarminError::Database(format!("Failed to create Parquet writer: {}", e))
142        })?;
143
144        writer
145            .write(batch)
146            .map_err(|e| GarminError::Database(format!("Failed to write batch: {}", e)))?;
147
148        writer
149            .close()
150            .map_err(|e| GarminError::Database(format!("Failed to close writer: {}", e)))?;
151
152        // Atomic rename
153        fs::rename(&temp_path, path)
154            .map_err(|e| GarminError::Database(format!("Failed to rename temp file: {}", e)))?;
155
156        Ok(())
157    }
158
159    /// Read all record batches from a partition file
160    fn read_batches(&self, path: &Path) -> Result<Vec<RecordBatch>> {
161        if !path.exists() {
162            return Ok(Vec::new());
163        }
164
165        let file = File::open(path)
166            .map_err(|e| GarminError::Database(format!("Failed to open file: {}", e)))?;
167
168        let reader = ParquetRecordBatchReaderBuilder::try_new(file)
169            .map_err(|e| GarminError::Database(format!("Failed to create reader: {}", e)))?
170            .build()
171            .map_err(|e| GarminError::Database(format!("Failed to build reader: {}", e)))?;
172
173        reader
174            .collect::<std::result::Result<Vec<_>, _>>()
175            .map_err(|e| GarminError::Database(format!("Failed to read batches: {}", e)))
176    }
177
178    // =========================================================================
179    // Activities
180    // =========================================================================
181
182    /// Write activities to weekly partitions
183    pub fn write_activities(&self, activities: &[Activity]) -> Result<()> {
184        self.ensure_dir(EntityType::Activities)?;
185
186        // Group by partition key
187        let mut partitions: std::collections::HashMap<String, Vec<&Activity>> =
188            std::collections::HashMap::new();
189
190        for activity in activities {
191            if let Some(start_time) = activity.start_time_local {
192                let key = EntityType::Activities.partition_key(start_time.date_naive());
193                partitions.entry(key).or_default().push(activity);
194            }
195        }
196
197        // Write each partition
198        for (key, partition_activities) in partitions {
199            let path = self.partition_path(EntityType::Activities, &key);
200            let batch = Self::activities_to_batch(partition_activities)?;
201            self.write_batch(&path, &batch)?;
202        }
203
204        Ok(())
205    }
206
207    /// Upsert activities (read existing, merge, write)
208    pub fn upsert_activities(&self, activities: &[Activity]) -> Result<()> {
209        self.ensure_dir(EntityType::Activities)?;
210
211        // Group new activities by partition key
212        let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
213            std::collections::HashMap::new();
214
215        for activity in activities {
216            if let Some(start_time) = activity.start_time_local {
217                let key = EntityType::Activities.partition_key(start_time.date_naive());
218                partitions.entry(key).or_default().push(activity.clone());
219            }
220        }
221
222        // For each partition, read existing and merge
223        for (key, mut new_activities) in partitions {
224            let path = self.partition_path(EntityType::Activities, &key);
225
226            // Read existing
227            let mut existing = self.read_activities_from_path(&path)?;
228
229            // Create set of new activity IDs for fast lookup
230            let new_ids: std::collections::HashSet<i64> =
231                new_activities.iter().map(|a| a.activity_id).collect();
232
233            // Keep existing activities that aren't being replaced
234            existing.retain(|a| !new_ids.contains(&a.activity_id));
235
236            // Merge
237            existing.append(&mut new_activities);
238
239            // Sort by activity_id for consistent ordering
240            existing.sort_by_key(|a| a.activity_id);
241
242            // Write merged
243            let refs: Vec<&Activity> = existing.iter().collect();
244            let batch = Self::activities_to_batch(refs)?;
245            self.write_batch(&path, &batch)?;
246        }
247
248        Ok(())
249    }
250
251    /// Upsert activities with partition-level locking for concurrent writes
252    pub async fn upsert_activities_async(&self, activities: &[Activity]) -> Result<()> {
253        self.ensure_dir(EntityType::Activities)?;
254
255        // Group new activities by partition key
256        let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
257            std::collections::HashMap::new();
258
259        for activity in activities {
260            if let Some(start_time) = activity.start_time_local {
261                let key = EntityType::Activities.partition_key(start_time.date_naive());
262                partitions.entry(key).or_default().push(activity.clone());
263            }
264        }
265
266        // For each partition, acquire lock then read/merge/write
267        for (key, mut new_activities) in partitions {
268            // Acquire partition lock
269            let lock = self.get_partition_lock(&key);
270            let _guard = lock.lock().await;
271
272            let path = self.partition_path(EntityType::Activities, &key);
273
274            // Read existing
275            let mut existing = self.read_activities_from_path(&path)?;
276
277            // Create set of new activity IDs for fast lookup
278            let new_ids: std::collections::HashSet<i64> =
279                new_activities.iter().map(|a| a.activity_id).collect();
280
281            // Keep existing activities that aren't being replaced
282            existing.retain(|a| !new_ids.contains(&a.activity_id));
283
284            // Merge
285            existing.append(&mut new_activities);
286
287            // Sort by activity_id for consistent ordering
288            existing.sort_by_key(|a| a.activity_id);
289
290            // Write merged
291            let refs: Vec<&Activity> = existing.iter().collect();
292            let batch = Self::activities_to_batch(refs)?;
293            self.write_batch(&path, &batch)?;
294        }
295
296        Ok(())
297    }
298
299    fn read_activities_from_path(&self, path: &Path) -> Result<Vec<Activity>> {
300        let batches = self.read_batches(path)?;
301        let mut activities = Vec::new();
302
303        for batch in batches {
304            activities.extend(Self::batch_to_activities(&batch)?);
305        }
306
307        Ok(activities)
308    }
309
310    fn activities_to_batch(activities: Vec<&Activity>) -> Result<RecordBatch> {
311        let activity_id: Int64Array = activities.iter().map(|a| a.activity_id).collect();
312        let profile_id: Int32Array = activities.iter().map(|a| a.profile_id).collect();
313        let activity_name: StringArray = activities
314            .iter()
315            .map(|a| a.activity_name.as_deref())
316            .collect();
317        let activity_type: StringArray = activities
318            .iter()
319            .map(|a| a.activity_type.as_deref())
320            .collect();
321        let start_time_local: TimestampMicrosecondArray = activities
322            .iter()
323            .map(|a| a.start_time_local.map(|t| t.timestamp_micros()))
324            .collect();
325        let start_time_gmt: TimestampMicrosecondArray = activities
326            .iter()
327            .map(|a| a.start_time_gmt.map(|t| t.timestamp_micros()))
328            .collect();
329        let duration_sec: Float64Array = activities.iter().map(|a| a.duration_sec).collect();
330        let distance_m: Float64Array = activities.iter().map(|a| a.distance_m).collect();
331        let calories: Int32Array = activities.iter().map(|a| a.calories).collect();
332        let avg_hr: Int32Array = activities.iter().map(|a| a.avg_hr).collect();
333        let max_hr: Int32Array = activities.iter().map(|a| a.max_hr).collect();
334        let avg_speed: Float64Array = activities.iter().map(|a| a.avg_speed).collect();
335        let max_speed: Float64Array = activities.iter().map(|a| a.max_speed).collect();
336        let elevation_gain: Float64Array = activities.iter().map(|a| a.elevation_gain).collect();
337        let elevation_loss: Float64Array = activities.iter().map(|a| a.elevation_loss).collect();
338        let avg_cadence: Float64Array = activities.iter().map(|a| a.avg_cadence).collect();
339        let avg_power: Int32Array = activities.iter().map(|a| a.avg_power).collect();
340        let normalized_power: Int32Array = activities.iter().map(|a| a.normalized_power).collect();
341        let training_effect: Float64Array = activities.iter().map(|a| a.training_effect).collect();
342        let training_load: Float64Array = activities.iter().map(|a| a.training_load).collect();
343        let start_lat: Float64Array = activities.iter().map(|a| a.start_lat).collect();
344        let start_lon: Float64Array = activities.iter().map(|a| a.start_lon).collect();
345        let end_lat: Float64Array = activities.iter().map(|a| a.end_lat).collect();
346        let end_lon: Float64Array = activities.iter().map(|a| a.end_lon).collect();
347        let ground_contact_time: Float64Array =
348            activities.iter().map(|a| a.ground_contact_time).collect();
349        let vertical_oscillation: Float64Array =
350            activities.iter().map(|a| a.vertical_oscillation).collect();
351        let stride_length: Float64Array = activities.iter().map(|a| a.stride_length).collect();
352        let location_name: StringArray = activities
353            .iter()
354            .map(|a| a.location_name.as_deref())
355            .collect();
356        let raw_json: StringArray = activities
357            .iter()
358            .map(|a| a.raw_json.as_ref().map(|j| j.to_string()))
359            .collect();
360
361        let schema = Arc::new(Schema::new(vec![
362            Field::new("activity_id", DataType::Int64, false),
363            Field::new("profile_id", DataType::Int32, false),
364            Field::new("activity_name", DataType::Utf8, true),
365            Field::new("activity_type", DataType::Utf8, true),
366            Field::new(
367                "start_time_local",
368                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
369                true,
370            ),
371            Field::new(
372                "start_time_gmt",
373                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
374                true,
375            ),
376            Field::new("duration_sec", DataType::Float64, true),
377            Field::new("distance_m", DataType::Float64, true),
378            Field::new("calories", DataType::Int32, true),
379            Field::new("avg_hr", DataType::Int32, true),
380            Field::new("max_hr", DataType::Int32, true),
381            Field::new("avg_speed", DataType::Float64, true),
382            Field::new("max_speed", DataType::Float64, true),
383            Field::new("elevation_gain", DataType::Float64, true),
384            Field::new("elevation_loss", DataType::Float64, true),
385            Field::new("avg_cadence", DataType::Float64, true),
386            Field::new("avg_power", DataType::Int32, true),
387            Field::new("normalized_power", DataType::Int32, true),
388            Field::new("training_effect", DataType::Float64, true),
389            Field::new("training_load", DataType::Float64, true),
390            Field::new("start_lat", DataType::Float64, true),
391            Field::new("start_lon", DataType::Float64, true),
392            Field::new("end_lat", DataType::Float64, true),
393            Field::new("end_lon", DataType::Float64, true),
394            Field::new("ground_contact_time", DataType::Float64, true),
395            Field::new("vertical_oscillation", DataType::Float64, true),
396            Field::new("stride_length", DataType::Float64, true),
397            Field::new("location_name", DataType::Utf8, true),
398            Field::new("raw_json", DataType::Utf8, true),
399        ]));
400
401        RecordBatch::try_new(
402            schema,
403            vec![
404                Arc::new(activity_id),
405                Arc::new(profile_id),
406                Arc::new(activity_name),
407                Arc::new(activity_type),
408                Arc::new(start_time_local),
409                Arc::new(start_time_gmt),
410                Arc::new(duration_sec),
411                Arc::new(distance_m),
412                Arc::new(calories),
413                Arc::new(avg_hr),
414                Arc::new(max_hr),
415                Arc::new(avg_speed),
416                Arc::new(max_speed),
417                Arc::new(elevation_gain),
418                Arc::new(elevation_loss),
419                Arc::new(avg_cadence),
420                Arc::new(avg_power),
421                Arc::new(normalized_power),
422                Arc::new(training_effect),
423                Arc::new(training_load),
424                Arc::new(start_lat),
425                Arc::new(start_lon),
426                Arc::new(end_lat),
427                Arc::new(end_lon),
428                Arc::new(ground_contact_time),
429                Arc::new(vertical_oscillation),
430                Arc::new(stride_length),
431                Arc::new(location_name),
432                Arc::new(raw_json),
433            ],
434        )
435        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
436    }
437
438    fn batch_to_activities(batch: &RecordBatch) -> Result<Vec<Activity>> {
439        let len = batch.num_rows();
440        let mut activities = Vec::with_capacity(len);
441
442        let activity_id = batch
443            .column(0)
444            .as_any()
445            .downcast_ref::<Int64Array>()
446            .unwrap();
447        let profile_id = batch
448            .column(1)
449            .as_any()
450            .downcast_ref::<Int32Array>()
451            .unwrap();
452        let activity_name = batch
453            .column(2)
454            .as_any()
455            .downcast_ref::<StringArray>()
456            .unwrap();
457        let activity_type = batch
458            .column(3)
459            .as_any()
460            .downcast_ref::<StringArray>()
461            .unwrap();
462        let start_time_local = batch
463            .column(4)
464            .as_any()
465            .downcast_ref::<TimestampMicrosecondArray>()
466            .unwrap();
467        let start_time_gmt = batch
468            .column(5)
469            .as_any()
470            .downcast_ref::<TimestampMicrosecondArray>()
471            .unwrap();
472        let duration_sec = batch
473            .column(6)
474            .as_any()
475            .downcast_ref::<Float64Array>()
476            .unwrap();
477        let distance_m = batch
478            .column(7)
479            .as_any()
480            .downcast_ref::<Float64Array>()
481            .unwrap();
482        let calories = batch
483            .column(8)
484            .as_any()
485            .downcast_ref::<Int32Array>()
486            .unwrap();
487        let avg_hr = batch
488            .column(9)
489            .as_any()
490            .downcast_ref::<Int32Array>()
491            .unwrap();
492        let max_hr = batch
493            .column(10)
494            .as_any()
495            .downcast_ref::<Int32Array>()
496            .unwrap();
497        let avg_speed = batch
498            .column(11)
499            .as_any()
500            .downcast_ref::<Float64Array>()
501            .unwrap();
502        let max_speed = batch
503            .column(12)
504            .as_any()
505            .downcast_ref::<Float64Array>()
506            .unwrap();
507        let elevation_gain = batch
508            .column(13)
509            .as_any()
510            .downcast_ref::<Float64Array>()
511            .unwrap();
512        let elevation_loss = batch
513            .column(14)
514            .as_any()
515            .downcast_ref::<Float64Array>()
516            .unwrap();
517        let avg_cadence = batch
518            .column(15)
519            .as_any()
520            .downcast_ref::<Float64Array>()
521            .unwrap();
522        let avg_power = batch
523            .column(16)
524            .as_any()
525            .downcast_ref::<Int32Array>()
526            .unwrap();
527        let normalized_power = batch
528            .column(17)
529            .as_any()
530            .downcast_ref::<Int32Array>()
531            .unwrap();
532        let training_effect = batch
533            .column(18)
534            .as_any()
535            .downcast_ref::<Float64Array>()
536            .unwrap();
537        let training_load = batch
538            .column(19)
539            .as_any()
540            .downcast_ref::<Float64Array>()
541            .unwrap();
542        let start_lat = batch
543            .column(20)
544            .as_any()
545            .downcast_ref::<Float64Array>()
546            .unwrap();
547        let start_lon = batch
548            .column(21)
549            .as_any()
550            .downcast_ref::<Float64Array>()
551            .unwrap();
552        let end_lat = batch
553            .column(22)
554            .as_any()
555            .downcast_ref::<Float64Array>()
556            .unwrap();
557        let end_lon = batch
558            .column(23)
559            .as_any()
560            .downcast_ref::<Float64Array>()
561            .unwrap();
562        let ground_contact_time = batch
563            .column(24)
564            .as_any()
565            .downcast_ref::<Float64Array>()
566            .unwrap();
567        let vertical_oscillation = batch
568            .column(25)
569            .as_any()
570            .downcast_ref::<Float64Array>()
571            .unwrap();
572        let stride_length = batch
573            .column(26)
574            .as_any()
575            .downcast_ref::<Float64Array>()
576            .unwrap();
577        let location_name = batch
578            .column(27)
579            .as_any()
580            .downcast_ref::<StringArray>()
581            .unwrap();
582        let raw_json = batch
583            .column(28)
584            .as_any()
585            .downcast_ref::<StringArray>()
586            .unwrap();
587
588        for i in 0..len {
589            activities.push(Activity {
590                activity_id: activity_id.value(i),
591                profile_id: profile_id.value(i),
592                activity_name: activity_name
593                    .is_valid(i)
594                    .then(|| activity_name.value(i).to_string()),
595                activity_type: activity_type
596                    .is_valid(i)
597                    .then(|| activity_type.value(i).to_string()),
598                start_time_local: start_time_local.is_valid(i).then(|| {
599                    DateTime::from_timestamp_micros(start_time_local.value(i)).unwrap_or_default()
600                }),
601                start_time_gmt: start_time_gmt.is_valid(i).then(|| {
602                    DateTime::from_timestamp_micros(start_time_gmt.value(i)).unwrap_or_default()
603                }),
604                duration_sec: duration_sec.is_valid(i).then(|| duration_sec.value(i)),
605                distance_m: distance_m.is_valid(i).then(|| distance_m.value(i)),
606                calories: calories.is_valid(i).then(|| calories.value(i)),
607                avg_hr: avg_hr.is_valid(i).then(|| avg_hr.value(i)),
608                max_hr: max_hr.is_valid(i).then(|| max_hr.value(i)),
609                avg_speed: avg_speed.is_valid(i).then(|| avg_speed.value(i)),
610                max_speed: max_speed.is_valid(i).then(|| max_speed.value(i)),
611                elevation_gain: elevation_gain.is_valid(i).then(|| elevation_gain.value(i)),
612                elevation_loss: elevation_loss.is_valid(i).then(|| elevation_loss.value(i)),
613                avg_cadence: avg_cadence.is_valid(i).then(|| avg_cadence.value(i)),
614                avg_power: avg_power.is_valid(i).then(|| avg_power.value(i)),
615                normalized_power: normalized_power
616                    .is_valid(i)
617                    .then(|| normalized_power.value(i)),
618                training_effect: training_effect
619                    .is_valid(i)
620                    .then(|| training_effect.value(i)),
621                training_load: training_load.is_valid(i).then(|| training_load.value(i)),
622                start_lat: start_lat.is_valid(i).then(|| start_lat.value(i)),
623                start_lon: start_lon.is_valid(i).then(|| start_lon.value(i)),
624                end_lat: end_lat.is_valid(i).then(|| end_lat.value(i)),
625                end_lon: end_lon.is_valid(i).then(|| end_lon.value(i)),
626                ground_contact_time: ground_contact_time
627                    .is_valid(i)
628                    .then(|| ground_contact_time.value(i)),
629                vertical_oscillation: vertical_oscillation
630                    .is_valid(i)
631                    .then(|| vertical_oscillation.value(i)),
632                stride_length: stride_length.is_valid(i).then(|| stride_length.value(i)),
633                location_name: location_name
634                    .is_valid(i)
635                    .then(|| location_name.value(i).to_string()),
636                raw_json: raw_json
637                    .is_valid(i)
638                    .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
639            });
640        }
641
642        Ok(activities)
643    }
644
645    // =========================================================================
646    // Daily Health
647    // =========================================================================
648
649    /// Upsert daily health records
650    pub fn upsert_daily_health(&self, records: &[DailyHealth]) -> Result<()> {
651        self.ensure_dir(EntityType::DailyHealth)?;
652
653        // Group by partition key
654        let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
655            std::collections::HashMap::new();
656
657        for record in records {
658            let key = EntityType::DailyHealth.partition_key(record.date);
659            partitions.entry(key).or_default().push(record.clone());
660        }
661
662        // For each partition, read existing and merge
663        for (key, mut new_records) in partitions {
664            let path = self.partition_path(EntityType::DailyHealth, &key);
665
666            // Read existing
667            let mut existing = self.read_daily_health_from_path(&path)?;
668
669            // Create set of new dates for fast lookup
670            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
671                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
672
673            // Keep existing records that aren't being replaced
674            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
675
676            // Merge
677            existing.append(&mut new_records);
678
679            // Sort by date for consistent ordering
680            existing.sort_by_key(|r| (r.profile_id, r.date));
681
682            // Write merged
683            let refs: Vec<&DailyHealth> = existing.iter().collect();
684            let batch = Self::daily_health_to_batch(refs)?;
685            self.write_batch(&path, &batch)?;
686        }
687
688        Ok(())
689    }
690
691    /// Upsert daily health records with partition-level locking for concurrent writes
692    pub async fn upsert_daily_health_async(&self, records: &[DailyHealth]) -> Result<()> {
693        self.ensure_dir(EntityType::DailyHealth)?;
694
695        // Group by partition key
696        let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
697            std::collections::HashMap::new();
698
699        for record in records {
700            let key = EntityType::DailyHealth.partition_key(record.date);
701            partitions.entry(key).or_default().push(record.clone());
702        }
703
704        // For each partition, acquire lock then read/merge/write
705        for (key, mut new_records) in partitions {
706            // Acquire partition lock
707            let lock = self.get_partition_lock(&key);
708            let _guard = lock.lock().await;
709
710            let path = self.partition_path(EntityType::DailyHealth, &key);
711
712            // Read existing
713            let mut existing = self.read_daily_health_from_path(&path)?;
714
715            // Create set of new dates for fast lookup
716            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
717                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
718
719            // Keep existing records that aren't being replaced
720            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
721
722            // Merge
723            existing.append(&mut new_records);
724
725            // Sort by date for consistent ordering
726            existing.sort_by_key(|r| (r.profile_id, r.date));
727
728            // Write merged
729            let refs: Vec<&DailyHealth> = existing.iter().collect();
730            let batch = Self::daily_health_to_batch(refs)?;
731            self.write_batch(&path, &batch)?;
732        }
733
734        Ok(())
735    }
736
737    fn read_daily_health_from_path(&self, path: &Path) -> Result<Vec<DailyHealth>> {
738        let batches = self.read_batches(path)?;
739        let mut records = Vec::new();
740
741        for batch in batches {
742            records.extend(Self::batch_to_daily_health(&batch)?);
743        }
744
745        Ok(records)
746    }
747
748    fn daily_health_to_batch(records: Vec<&DailyHealth>) -> Result<RecordBatch> {
749        let id: Int64Array = records.iter().map(|r| r.id).collect();
750        let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
751        let date: Date32Array = records
752            .iter()
753            .map(|r| {
754                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
755                Some((r.date - epoch).num_days() as i32)
756            })
757            .collect();
758        let steps: Int32Array = records.iter().map(|r| r.steps).collect();
759        let step_goal: Int32Array = records.iter().map(|r| r.step_goal).collect();
760        let total_calories: Int32Array = records.iter().map(|r| r.total_calories).collect();
761        let active_calories: Int32Array = records.iter().map(|r| r.active_calories).collect();
762        let bmr_calories: Int32Array = records.iter().map(|r| r.bmr_calories).collect();
763        let resting_hr: Int32Array = records.iter().map(|r| r.resting_hr).collect();
764        let sleep_seconds: Int32Array = records.iter().map(|r| r.sleep_seconds).collect();
765        let deep_sleep_seconds: Int32Array = records.iter().map(|r| r.deep_sleep_seconds).collect();
766        let light_sleep_seconds: Int32Array =
767            records.iter().map(|r| r.light_sleep_seconds).collect();
768        let rem_sleep_seconds: Int32Array = records.iter().map(|r| r.rem_sleep_seconds).collect();
769        let sleep_score: Int32Array = records.iter().map(|r| r.sleep_score).collect();
770        let avg_stress: Int32Array = records.iter().map(|r| r.avg_stress).collect();
771        let max_stress: Int32Array = records.iter().map(|r| r.max_stress).collect();
772        let body_battery_start: Int32Array = records.iter().map(|r| r.body_battery_start).collect();
773        let body_battery_end: Int32Array = records.iter().map(|r| r.body_battery_end).collect();
774        let hrv_weekly_avg: Int32Array = records.iter().map(|r| r.hrv_weekly_avg).collect();
775        let hrv_last_night: Int32Array = records.iter().map(|r| r.hrv_last_night).collect();
776        let hrv_status: StringArray = records.iter().map(|r| r.hrv_status.as_deref()).collect();
777        let avg_respiration: Float64Array = records.iter().map(|r| r.avg_respiration).collect();
778        let avg_spo2: Int32Array = records.iter().map(|r| r.avg_spo2).collect();
779        let lowest_spo2: Int32Array = records.iter().map(|r| r.lowest_spo2).collect();
780        let hydration_ml: Int32Array = records.iter().map(|r| r.hydration_ml).collect();
781        let moderate_intensity_min: Int32Array =
782            records.iter().map(|r| r.moderate_intensity_min).collect();
783        let vigorous_intensity_min: Int32Array =
784            records.iter().map(|r| r.vigorous_intensity_min).collect();
785        let raw_json: StringArray = records
786            .iter()
787            .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
788            .collect();
789
790        let schema = Arc::new(Schema::new(vec![
791            Field::new("id", DataType::Int64, true),
792            Field::new("profile_id", DataType::Int32, false),
793            Field::new("date", DataType::Date32, false),
794            Field::new("steps", DataType::Int32, true),
795            Field::new("step_goal", DataType::Int32, true),
796            Field::new("total_calories", DataType::Int32, true),
797            Field::new("active_calories", DataType::Int32, true),
798            Field::new("bmr_calories", DataType::Int32, true),
799            Field::new("resting_hr", DataType::Int32, true),
800            Field::new("sleep_seconds", DataType::Int32, true),
801            Field::new("deep_sleep_seconds", DataType::Int32, true),
802            Field::new("light_sleep_seconds", DataType::Int32, true),
803            Field::new("rem_sleep_seconds", DataType::Int32, true),
804            Field::new("sleep_score", DataType::Int32, true),
805            Field::new("avg_stress", DataType::Int32, true),
806            Field::new("max_stress", DataType::Int32, true),
807            Field::new("body_battery_start", DataType::Int32, true),
808            Field::new("body_battery_end", DataType::Int32, true),
809            Field::new("hrv_weekly_avg", DataType::Int32, true),
810            Field::new("hrv_last_night", DataType::Int32, true),
811            Field::new("hrv_status", DataType::Utf8, true),
812            Field::new("avg_respiration", DataType::Float64, true),
813            Field::new("avg_spo2", DataType::Int32, true),
814            Field::new("lowest_spo2", DataType::Int32, true),
815            Field::new("hydration_ml", DataType::Int32, true),
816            Field::new("moderate_intensity_min", DataType::Int32, true),
817            Field::new("vigorous_intensity_min", DataType::Int32, true),
818            Field::new("raw_json", DataType::Utf8, true),
819        ]));
820
821        RecordBatch::try_new(
822            schema,
823            vec![
824                Arc::new(id),
825                Arc::new(profile_id),
826                Arc::new(date),
827                Arc::new(steps),
828                Arc::new(step_goal),
829                Arc::new(total_calories),
830                Arc::new(active_calories),
831                Arc::new(bmr_calories),
832                Arc::new(resting_hr),
833                Arc::new(sleep_seconds),
834                Arc::new(deep_sleep_seconds),
835                Arc::new(light_sleep_seconds),
836                Arc::new(rem_sleep_seconds),
837                Arc::new(sleep_score),
838                Arc::new(avg_stress),
839                Arc::new(max_stress),
840                Arc::new(body_battery_start),
841                Arc::new(body_battery_end),
842                Arc::new(hrv_weekly_avg),
843                Arc::new(hrv_last_night),
844                Arc::new(hrv_status),
845                Arc::new(avg_respiration),
846                Arc::new(avg_spo2),
847                Arc::new(lowest_spo2),
848                Arc::new(hydration_ml),
849                Arc::new(moderate_intensity_min),
850                Arc::new(vigorous_intensity_min),
851                Arc::new(raw_json),
852            ],
853        )
854        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
855    }
856
857    fn batch_to_daily_health(batch: &RecordBatch) -> Result<Vec<DailyHealth>> {
858        let len = batch.num_rows();
859        let mut records = Vec::with_capacity(len);
860
861        let id = batch
862            .column(0)
863            .as_any()
864            .downcast_ref::<Int64Array>()
865            .unwrap();
866        let profile_id = batch
867            .column(1)
868            .as_any()
869            .downcast_ref::<Int32Array>()
870            .unwrap();
871        let date = batch
872            .column(2)
873            .as_any()
874            .downcast_ref::<Date32Array>()
875            .unwrap();
876        let steps = batch
877            .column(3)
878            .as_any()
879            .downcast_ref::<Int32Array>()
880            .unwrap();
881        let step_goal = batch
882            .column(4)
883            .as_any()
884            .downcast_ref::<Int32Array>()
885            .unwrap();
886        let total_calories = batch
887            .column(5)
888            .as_any()
889            .downcast_ref::<Int32Array>()
890            .unwrap();
891        let active_calories = batch
892            .column(6)
893            .as_any()
894            .downcast_ref::<Int32Array>()
895            .unwrap();
896        let bmr_calories = batch
897            .column(7)
898            .as_any()
899            .downcast_ref::<Int32Array>()
900            .unwrap();
901        let resting_hr = batch
902            .column(8)
903            .as_any()
904            .downcast_ref::<Int32Array>()
905            .unwrap();
906        let sleep_seconds = batch
907            .column(9)
908            .as_any()
909            .downcast_ref::<Int32Array>()
910            .unwrap();
911        let deep_sleep_seconds = batch
912            .column(10)
913            .as_any()
914            .downcast_ref::<Int32Array>()
915            .unwrap();
916        let light_sleep_seconds = batch
917            .column(11)
918            .as_any()
919            .downcast_ref::<Int32Array>()
920            .unwrap();
921        let rem_sleep_seconds = batch
922            .column(12)
923            .as_any()
924            .downcast_ref::<Int32Array>()
925            .unwrap();
926        let sleep_score = batch
927            .column(13)
928            .as_any()
929            .downcast_ref::<Int32Array>()
930            .unwrap();
931        let avg_stress = batch
932            .column(14)
933            .as_any()
934            .downcast_ref::<Int32Array>()
935            .unwrap();
936        let max_stress = batch
937            .column(15)
938            .as_any()
939            .downcast_ref::<Int32Array>()
940            .unwrap();
941        let body_battery_start = batch
942            .column(16)
943            .as_any()
944            .downcast_ref::<Int32Array>()
945            .unwrap();
946        let body_battery_end = batch
947            .column(17)
948            .as_any()
949            .downcast_ref::<Int32Array>()
950            .unwrap();
951        let hrv_weekly_avg = batch
952            .column(18)
953            .as_any()
954            .downcast_ref::<Int32Array>()
955            .unwrap();
956        let hrv_last_night = batch
957            .column(19)
958            .as_any()
959            .downcast_ref::<Int32Array>()
960            .unwrap();
961        let hrv_status = batch
962            .column(20)
963            .as_any()
964            .downcast_ref::<StringArray>()
965            .unwrap();
966        let avg_respiration = batch
967            .column(21)
968            .as_any()
969            .downcast_ref::<Float64Array>()
970            .unwrap();
971        let avg_spo2 = batch
972            .column(22)
973            .as_any()
974            .downcast_ref::<Int32Array>()
975            .unwrap();
976        let lowest_spo2 = batch
977            .column(23)
978            .as_any()
979            .downcast_ref::<Int32Array>()
980            .unwrap();
981        let hydration_ml = batch
982            .column(24)
983            .as_any()
984            .downcast_ref::<Int32Array>()
985            .unwrap();
986        let moderate_intensity_min = batch
987            .column(25)
988            .as_any()
989            .downcast_ref::<Int32Array>()
990            .unwrap();
991        let vigorous_intensity_min = batch
992            .column(26)
993            .as_any()
994            .downcast_ref::<Int32Array>()
995            .unwrap();
996        let raw_json = batch
997            .column(27)
998            .as_any()
999            .downcast_ref::<StringArray>()
1000            .unwrap();
1001
1002        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1003
1004        for i in 0..len {
1005            records.push(DailyHealth {
1006                id: id.is_valid(i).then(|| id.value(i)),
1007                profile_id: profile_id.value(i),
1008                date: epoch + chrono::Duration::days(date.value(i) as i64),
1009                steps: steps.is_valid(i).then(|| steps.value(i)),
1010                step_goal: step_goal.is_valid(i).then(|| step_goal.value(i)),
1011                total_calories: total_calories.is_valid(i).then(|| total_calories.value(i)),
1012                active_calories: active_calories
1013                    .is_valid(i)
1014                    .then(|| active_calories.value(i)),
1015                bmr_calories: bmr_calories.is_valid(i).then(|| bmr_calories.value(i)),
1016                resting_hr: resting_hr.is_valid(i).then(|| resting_hr.value(i)),
1017                sleep_seconds: sleep_seconds.is_valid(i).then(|| sleep_seconds.value(i)),
1018                deep_sleep_seconds: deep_sleep_seconds
1019                    .is_valid(i)
1020                    .then(|| deep_sleep_seconds.value(i)),
1021                light_sleep_seconds: light_sleep_seconds
1022                    .is_valid(i)
1023                    .then(|| light_sleep_seconds.value(i)),
1024                rem_sleep_seconds: rem_sleep_seconds
1025                    .is_valid(i)
1026                    .then(|| rem_sleep_seconds.value(i)),
1027                sleep_score: sleep_score.is_valid(i).then(|| sleep_score.value(i)),
1028                avg_stress: avg_stress.is_valid(i).then(|| avg_stress.value(i)),
1029                max_stress: max_stress.is_valid(i).then(|| max_stress.value(i)),
1030                body_battery_start: body_battery_start
1031                    .is_valid(i)
1032                    .then(|| body_battery_start.value(i)),
1033                body_battery_end: body_battery_end
1034                    .is_valid(i)
1035                    .then(|| body_battery_end.value(i)),
1036                hrv_weekly_avg: hrv_weekly_avg.is_valid(i).then(|| hrv_weekly_avg.value(i)),
1037                hrv_last_night: hrv_last_night.is_valid(i).then(|| hrv_last_night.value(i)),
1038                hrv_status: hrv_status
1039                    .is_valid(i)
1040                    .then(|| hrv_status.value(i).to_string()),
1041                avg_respiration: avg_respiration
1042                    .is_valid(i)
1043                    .then(|| avg_respiration.value(i)),
1044                avg_spo2: avg_spo2.is_valid(i).then(|| avg_spo2.value(i)),
1045                lowest_spo2: lowest_spo2.is_valid(i).then(|| lowest_spo2.value(i)),
1046                hydration_ml: hydration_ml.is_valid(i).then(|| hydration_ml.value(i)),
1047                moderate_intensity_min: moderate_intensity_min
1048                    .is_valid(i)
1049                    .then(|| moderate_intensity_min.value(i)),
1050                vigorous_intensity_min: vigorous_intensity_min
1051                    .is_valid(i)
1052                    .then(|| vigorous_intensity_min.value(i)),
1053                raw_json: raw_json
1054                    .is_valid(i)
1055                    .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1056            });
1057        }
1058
1059        Ok(records)
1060    }
1061
1062    // =========================================================================
1063    // Track Points
1064    // =========================================================================
1065
1066    /// Write track points for an activity
1067    pub fn write_track_points(
1068        &self,
1069        activity_date: NaiveDate,
1070        points: &[TrackPoint],
1071    ) -> Result<()> {
1072        self.ensure_dir(EntityType::TrackPoints)?;
1073
1074        let key = EntityType::TrackPoints.partition_key(activity_date);
1075        let path = self.partition_path(EntityType::TrackPoints, &key);
1076
1077        // Read existing, filter out points for the same activity, add new
1078        let mut existing = self.read_track_points_from_path(&path)?;
1079
1080        if let Some(first) = points.first() {
1081            existing.retain(|p| p.activity_id != first.activity_id);
1082        }
1083
1084        existing.extend(points.iter().cloned());
1085
1086        // Sort by activity_id, then timestamp
1087        existing.sort_by(|a, b| {
1088            a.activity_id
1089                .cmp(&b.activity_id)
1090                .then(a.timestamp.cmp(&b.timestamp))
1091        });
1092
1093        let refs: Vec<&TrackPoint> = existing.iter().collect();
1094        let batch = Self::track_points_to_batch(refs)?;
1095        self.write_batch(&path, &batch)?;
1096
1097        Ok(())
1098    }
1099
1100    /// Write track points with partition-level locking for concurrent writes
1101    pub async fn write_track_points_async(
1102        &self,
1103        activity_date: NaiveDate,
1104        points: &[TrackPoint],
1105    ) -> Result<()> {
1106        self.ensure_dir(EntityType::TrackPoints)?;
1107
1108        let key = EntityType::TrackPoints.partition_key(activity_date);
1109
1110        // Acquire partition lock
1111        let lock = self.get_partition_lock(&key);
1112        let _guard = lock.lock().await;
1113
1114        let path = self.partition_path(EntityType::TrackPoints, &key);
1115
1116        // Read existing, filter out points for the same activity, add new
1117        let mut existing = self.read_track_points_from_path(&path)?;
1118
1119        if let Some(first) = points.first() {
1120            existing.retain(|p| p.activity_id != first.activity_id);
1121        }
1122
1123        existing.extend(points.iter().cloned());
1124
1125        // Sort by activity_id, then timestamp
1126        existing.sort_by(|a, b| {
1127            a.activity_id
1128                .cmp(&b.activity_id)
1129                .then(a.timestamp.cmp(&b.timestamp))
1130        });
1131
1132        let refs: Vec<&TrackPoint> = existing.iter().collect();
1133        let batch = Self::track_points_to_batch(refs)?;
1134        self.write_batch(&path, &batch)?;
1135
1136        Ok(())
1137    }
1138
1139    fn read_track_points_from_path(&self, path: &Path) -> Result<Vec<TrackPoint>> {
1140        let batches = self.read_batches(path)?;
1141        let mut points = Vec::new();
1142
1143        for batch in batches {
1144            points.extend(Self::batch_to_track_points(&batch)?);
1145        }
1146
1147        Ok(points)
1148    }
1149
1150    fn track_points_to_batch(points: Vec<&TrackPoint>) -> Result<RecordBatch> {
1151        let id: Int64Array = points.iter().map(|p| p.id).collect();
1152        let activity_id: Int64Array = points.iter().map(|p| p.activity_id).collect();
1153        let timestamp: TimestampMicrosecondArray = points
1154            .iter()
1155            .map(|p| Some(p.timestamp.timestamp_micros()))
1156            .collect();
1157        let lat: Float64Array = points.iter().map(|p| p.lat).collect();
1158        let lon: Float64Array = points.iter().map(|p| p.lon).collect();
1159        let elevation: Float64Array = points.iter().map(|p| p.elevation).collect();
1160        let heart_rate: Int32Array = points.iter().map(|p| p.heart_rate).collect();
1161        let cadence: Int32Array = points.iter().map(|p| p.cadence).collect();
1162        let power: Int32Array = points.iter().map(|p| p.power).collect();
1163        let speed: Float64Array = points.iter().map(|p| p.speed).collect();
1164
1165        let schema = Arc::new(Schema::new(vec![
1166            Field::new("id", DataType::Int64, true),
1167            Field::new("activity_id", DataType::Int64, false),
1168            Field::new(
1169                "timestamp",
1170                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1171                false,
1172            ),
1173            Field::new("lat", DataType::Float64, true),
1174            Field::new("lon", DataType::Float64, true),
1175            Field::new("elevation", DataType::Float64, true),
1176            Field::new("heart_rate", DataType::Int32, true),
1177            Field::new("cadence", DataType::Int32, true),
1178            Field::new("power", DataType::Int32, true),
1179            Field::new("speed", DataType::Float64, true),
1180        ]));
1181
1182        RecordBatch::try_new(
1183            schema,
1184            vec![
1185                Arc::new(id),
1186                Arc::new(activity_id),
1187                Arc::new(timestamp),
1188                Arc::new(lat),
1189                Arc::new(lon),
1190                Arc::new(elevation),
1191                Arc::new(heart_rate),
1192                Arc::new(cadence),
1193                Arc::new(power),
1194                Arc::new(speed),
1195            ],
1196        )
1197        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1198    }
1199
1200    fn batch_to_track_points(batch: &RecordBatch) -> Result<Vec<TrackPoint>> {
1201        let len = batch.num_rows();
1202        let mut points = Vec::with_capacity(len);
1203
1204        let id = batch
1205            .column(0)
1206            .as_any()
1207            .downcast_ref::<Int64Array>()
1208            .unwrap();
1209        let activity_id = batch
1210            .column(1)
1211            .as_any()
1212            .downcast_ref::<Int64Array>()
1213            .unwrap();
1214        let timestamp = batch
1215            .column(2)
1216            .as_any()
1217            .downcast_ref::<TimestampMicrosecondArray>()
1218            .unwrap();
1219        let lat = batch
1220            .column(3)
1221            .as_any()
1222            .downcast_ref::<Float64Array>()
1223            .unwrap();
1224        let lon = batch
1225            .column(4)
1226            .as_any()
1227            .downcast_ref::<Float64Array>()
1228            .unwrap();
1229        let elevation = batch
1230            .column(5)
1231            .as_any()
1232            .downcast_ref::<Float64Array>()
1233            .unwrap();
1234        let heart_rate = batch
1235            .column(6)
1236            .as_any()
1237            .downcast_ref::<Int32Array>()
1238            .unwrap();
1239        let cadence = batch
1240            .column(7)
1241            .as_any()
1242            .downcast_ref::<Int32Array>()
1243            .unwrap();
1244        let power = batch
1245            .column(8)
1246            .as_any()
1247            .downcast_ref::<Int32Array>()
1248            .unwrap();
1249        let speed = batch
1250            .column(9)
1251            .as_any()
1252            .downcast_ref::<Float64Array>()
1253            .unwrap();
1254
1255        for i in 0..len {
1256            points.push(TrackPoint {
1257                id: id.is_valid(i).then(|| id.value(i)),
1258                activity_id: activity_id.value(i),
1259                timestamp: DateTime::from_timestamp_micros(timestamp.value(i)).unwrap_or_default(),
1260                lat: lat.is_valid(i).then(|| lat.value(i)),
1261                lon: lon.is_valid(i).then(|| lon.value(i)),
1262                elevation: elevation.is_valid(i).then(|| elevation.value(i)),
1263                heart_rate: heart_rate.is_valid(i).then(|| heart_rate.value(i)),
1264                cadence: cadence.is_valid(i).then(|| cadence.value(i)),
1265                power: power.is_valid(i).then(|| power.value(i)),
1266                speed: speed.is_valid(i).then(|| speed.value(i)),
1267            });
1268        }
1269
1270        Ok(points)
1271    }
1272
1273    // =========================================================================
1274    // Performance Metrics
1275    // =========================================================================
1276
1277    /// Upsert performance metrics records
1278    pub fn upsert_performance_metrics(&self, records: &[PerformanceMetrics]) -> Result<()> {
1279        self.ensure_dir(EntityType::PerformanceMetrics)?;
1280
1281        // Group by partition key
1282        let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1283            std::collections::HashMap::new();
1284
1285        for record in records {
1286            let key = EntityType::PerformanceMetrics.partition_key(record.date);
1287            partitions.entry(key).or_default().push(record.clone());
1288        }
1289
1290        // For each partition, read existing and merge
1291        for (key, mut new_records) in partitions {
1292            let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1293
1294            // Read existing
1295            let mut existing = self.read_performance_metrics_from_path(&path)?;
1296
1297            // Create set of new dates for fast lookup
1298            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1299                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1300
1301            // Keep existing records that aren't being replaced
1302            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1303
1304            // Merge
1305            existing.append(&mut new_records);
1306
1307            // Sort by date for consistent ordering
1308            existing.sort_by_key(|r| (r.profile_id, r.date));
1309
1310            // Write merged
1311            let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1312            let batch = Self::performance_metrics_to_batch(refs)?;
1313            self.write_batch(&path, &batch)?;
1314        }
1315
1316        Ok(())
1317    }
1318
1319    /// Upsert performance metrics records with partition-level locking for concurrent writes
1320    pub async fn upsert_performance_metrics_async(
1321        &self,
1322        records: &[PerformanceMetrics],
1323    ) -> Result<()> {
1324        self.ensure_dir(EntityType::PerformanceMetrics)?;
1325
1326        // Group by partition key
1327        let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1328            std::collections::HashMap::new();
1329
1330        for record in records {
1331            let key = EntityType::PerformanceMetrics.partition_key(record.date);
1332            partitions.entry(key).or_default().push(record.clone());
1333        }
1334
1335        // For each partition, acquire lock then read/merge/write
1336        for (key, mut new_records) in partitions {
1337            // Acquire partition lock
1338            let lock = self.get_partition_lock(&key);
1339            let _guard = lock.lock().await;
1340
1341            let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1342
1343            // Read existing
1344            let mut existing = self.read_performance_metrics_from_path(&path)?;
1345
1346            // Create set of new dates for fast lookup
1347            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1348                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1349
1350            // Keep existing records that aren't being replaced
1351            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1352
1353            // Merge
1354            existing.append(&mut new_records);
1355
1356            // Sort by date for consistent ordering
1357            existing.sort_by_key(|r| (r.profile_id, r.date));
1358
1359            // Write merged
1360            let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1361            let batch = Self::performance_metrics_to_batch(refs)?;
1362            self.write_batch(&path, &batch)?;
1363        }
1364
1365        Ok(())
1366    }
1367
1368    fn read_performance_metrics_from_path(&self, path: &Path) -> Result<Vec<PerformanceMetrics>> {
1369        let batches = self.read_batches(path)?;
1370        let mut records = Vec::new();
1371
1372        for batch in batches {
1373            records.extend(Self::batch_to_performance_metrics(&batch)?);
1374        }
1375
1376        Ok(records)
1377    }
1378
1379    fn performance_metrics_to_batch(records: Vec<&PerformanceMetrics>) -> Result<RecordBatch> {
1380        let id: Int64Array = records.iter().map(|r| r.id).collect();
1381        let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1382        let date: Date32Array = records
1383            .iter()
1384            .map(|r| {
1385                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1386                Some((r.date - epoch).num_days() as i32)
1387            })
1388            .collect();
1389        let vo2max: Float64Array = records.iter().map(|r| r.vo2max).collect();
1390        let fitness_age: Int32Array = records.iter().map(|r| r.fitness_age).collect();
1391        let training_readiness: Int32Array = records.iter().map(|r| r.training_readiness).collect();
1392        let training_status: StringArray = records
1393            .iter()
1394            .map(|r| r.training_status.as_deref())
1395            .collect();
1396        let lactate_threshold_hr: Int32Array =
1397            records.iter().map(|r| r.lactate_threshold_hr).collect();
1398        let lactate_threshold_pace: Float64Array =
1399            records.iter().map(|r| r.lactate_threshold_pace).collect();
1400        let race_5k_sec: Int32Array = records.iter().map(|r| r.race_5k_sec).collect();
1401        let race_10k_sec: Int32Array = records.iter().map(|r| r.race_10k_sec).collect();
1402        let race_half_sec: Int32Array = records.iter().map(|r| r.race_half_sec).collect();
1403        let race_marathon_sec: Int32Array = records.iter().map(|r| r.race_marathon_sec).collect();
1404        let endurance_score: Int32Array = records.iter().map(|r| r.endurance_score).collect();
1405        let hill_score: Int32Array = records.iter().map(|r| r.hill_score).collect();
1406        let raw_json: StringArray = records
1407            .iter()
1408            .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
1409            .collect();
1410
1411        let schema = Arc::new(Schema::new(vec![
1412            Field::new("id", DataType::Int64, true),
1413            Field::new("profile_id", DataType::Int32, false),
1414            Field::new("date", DataType::Date32, false),
1415            Field::new("vo2max", DataType::Float64, true),
1416            Field::new("fitness_age", DataType::Int32, true),
1417            Field::new("training_readiness", DataType::Int32, true),
1418            Field::new("training_status", DataType::Utf8, true),
1419            Field::new("lactate_threshold_hr", DataType::Int32, true),
1420            Field::new("lactate_threshold_pace", DataType::Float64, true),
1421            Field::new("race_5k_sec", DataType::Int32, true),
1422            Field::new("race_10k_sec", DataType::Int32, true),
1423            Field::new("race_half_sec", DataType::Int32, true),
1424            Field::new("race_marathon_sec", DataType::Int32, true),
1425            Field::new("endurance_score", DataType::Int32, true),
1426            Field::new("hill_score", DataType::Int32, true),
1427            Field::new("raw_json", DataType::Utf8, true),
1428        ]));
1429
1430        RecordBatch::try_new(
1431            schema,
1432            vec![
1433                Arc::new(id),
1434                Arc::new(profile_id),
1435                Arc::new(date),
1436                Arc::new(vo2max),
1437                Arc::new(fitness_age),
1438                Arc::new(training_readiness),
1439                Arc::new(training_status),
1440                Arc::new(lactate_threshold_hr),
1441                Arc::new(lactate_threshold_pace),
1442                Arc::new(race_5k_sec),
1443                Arc::new(race_10k_sec),
1444                Arc::new(race_half_sec),
1445                Arc::new(race_marathon_sec),
1446                Arc::new(endurance_score),
1447                Arc::new(hill_score),
1448                Arc::new(raw_json),
1449            ],
1450        )
1451        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1452    }
1453
1454    fn batch_to_performance_metrics(batch: &RecordBatch) -> Result<Vec<PerformanceMetrics>> {
1455        let len = batch.num_rows();
1456        let mut records = Vec::with_capacity(len);
1457
1458        let id = batch
1459            .column(0)
1460            .as_any()
1461            .downcast_ref::<Int64Array>()
1462            .unwrap();
1463        let profile_id = batch
1464            .column(1)
1465            .as_any()
1466            .downcast_ref::<Int32Array>()
1467            .unwrap();
1468        let date = batch
1469            .column(2)
1470            .as_any()
1471            .downcast_ref::<Date32Array>()
1472            .unwrap();
1473        let vo2max = batch
1474            .column(3)
1475            .as_any()
1476            .downcast_ref::<Float64Array>()
1477            .unwrap();
1478        let fitness_age = batch
1479            .column(4)
1480            .as_any()
1481            .downcast_ref::<Int32Array>()
1482            .unwrap();
1483        let training_readiness = batch
1484            .column(5)
1485            .as_any()
1486            .downcast_ref::<Int32Array>()
1487            .unwrap();
1488        let training_status = batch
1489            .column(6)
1490            .as_any()
1491            .downcast_ref::<StringArray>()
1492            .unwrap();
1493        let lactate_threshold_hr = batch
1494            .column(7)
1495            .as_any()
1496            .downcast_ref::<Int32Array>()
1497            .unwrap();
1498        let lactate_threshold_pace = batch
1499            .column(8)
1500            .as_any()
1501            .downcast_ref::<Float64Array>()
1502            .unwrap();
1503        let race_5k_sec = batch
1504            .column(9)
1505            .as_any()
1506            .downcast_ref::<Int32Array>()
1507            .unwrap();
1508        let race_10k_sec = batch
1509            .column(10)
1510            .as_any()
1511            .downcast_ref::<Int32Array>()
1512            .unwrap();
1513        let race_half_sec = batch
1514            .column(11)
1515            .as_any()
1516            .downcast_ref::<Int32Array>()
1517            .unwrap();
1518        let race_marathon_sec = batch
1519            .column(12)
1520            .as_any()
1521            .downcast_ref::<Int32Array>()
1522            .unwrap();
1523        let endurance_score = batch
1524            .column(13)
1525            .as_any()
1526            .downcast_ref::<Int32Array>()
1527            .unwrap();
1528        let hill_score = batch
1529            .column(14)
1530            .as_any()
1531            .downcast_ref::<Int32Array>()
1532            .unwrap();
1533        let raw_json = batch
1534            .column(15)
1535            .as_any()
1536            .downcast_ref::<StringArray>()
1537            .unwrap();
1538
1539        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1540
1541        for i in 0..len {
1542            records.push(PerformanceMetrics {
1543                id: id.is_valid(i).then(|| id.value(i)),
1544                profile_id: profile_id.value(i),
1545                date: epoch + chrono::Duration::days(date.value(i) as i64),
1546                vo2max: vo2max.is_valid(i).then(|| vo2max.value(i)),
1547                fitness_age: fitness_age.is_valid(i).then(|| fitness_age.value(i)),
1548                training_readiness: training_readiness
1549                    .is_valid(i)
1550                    .then(|| training_readiness.value(i)),
1551                training_status: training_status
1552                    .is_valid(i)
1553                    .then(|| training_status.value(i).to_string()),
1554                lactate_threshold_hr: lactate_threshold_hr
1555                    .is_valid(i)
1556                    .then(|| lactate_threshold_hr.value(i)),
1557                lactate_threshold_pace: lactate_threshold_pace
1558                    .is_valid(i)
1559                    .then(|| lactate_threshold_pace.value(i)),
1560                race_5k_sec: race_5k_sec.is_valid(i).then(|| race_5k_sec.value(i)),
1561                race_10k_sec: race_10k_sec.is_valid(i).then(|| race_10k_sec.value(i)),
1562                race_half_sec: race_half_sec.is_valid(i).then(|| race_half_sec.value(i)),
1563                race_marathon_sec: race_marathon_sec
1564                    .is_valid(i)
1565                    .then(|| race_marathon_sec.value(i)),
1566                endurance_score: endurance_score
1567                    .is_valid(i)
1568                    .then(|| endurance_score.value(i)),
1569                hill_score: hill_score.is_valid(i).then(|| hill_score.value(i)),
1570                raw_json: raw_json
1571                    .is_valid(i)
1572                    .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1573            });
1574        }
1575
1576        Ok(records)
1577    }
1578
1579    // =========================================================================
1580    // Weight
1581    // =========================================================================
1582
1583    /// Upsert weight entries
1584    pub fn upsert_weight(&self, records: &[WeightEntry]) -> Result<()> {
1585        self.ensure_dir(EntityType::Weight)?;
1586
1587        // Group by partition key
1588        let mut partitions: std::collections::HashMap<String, Vec<WeightEntry>> =
1589            std::collections::HashMap::new();
1590
1591        for record in records {
1592            let key = EntityType::Weight.partition_key(record.date);
1593            partitions.entry(key).or_default().push(record.clone());
1594        }
1595
1596        // For each partition, read existing and merge
1597        for (key, mut new_records) in partitions {
1598            let path = self.partition_path(EntityType::Weight, &key);
1599
1600            // Read existing
1601            let mut existing = self.read_weight_from_path(&path)?;
1602
1603            // Create set of new dates for fast lookup
1604            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1605                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1606
1607            // Keep existing records that aren't being replaced
1608            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1609
1610            // Merge
1611            existing.append(&mut new_records);
1612
1613            // Sort by date for consistent ordering
1614            existing.sort_by_key(|r| (r.profile_id, r.date));
1615
1616            // Write merged
1617            let refs: Vec<&WeightEntry> = existing.iter().collect();
1618            let batch = Self::weight_to_batch(refs)?;
1619            self.write_batch(&path, &batch)?;
1620        }
1621
1622        Ok(())
1623    }
1624
1625    fn read_weight_from_path(&self, path: &Path) -> Result<Vec<WeightEntry>> {
1626        let batches = self.read_batches(path)?;
1627        let mut records = Vec::new();
1628
1629        for batch in batches {
1630            records.extend(Self::batch_to_weight(&batch)?);
1631        }
1632
1633        Ok(records)
1634    }
1635
1636    fn weight_to_batch(records: Vec<&WeightEntry>) -> Result<RecordBatch> {
1637        let id: Int64Array = records.iter().map(|r| r.id).collect();
1638        let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1639        let date: Date32Array = records
1640            .iter()
1641            .map(|r| {
1642                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1643                Some((r.date - epoch).num_days() as i32)
1644            })
1645            .collect();
1646        let weight_kg: Float64Array = records.iter().map(|r| r.weight_kg).collect();
1647        let bmi: Float64Array = records.iter().map(|r| r.bmi).collect();
1648        let body_fat_pct: Float64Array = records.iter().map(|r| r.body_fat_pct).collect();
1649        let muscle_mass_kg: Float64Array = records.iter().map(|r| r.muscle_mass_kg).collect();
1650
1651        let schema = Arc::new(Schema::new(vec![
1652            Field::new("id", DataType::Int64, true),
1653            Field::new("profile_id", DataType::Int32, false),
1654            Field::new("date", DataType::Date32, false),
1655            Field::new("weight_kg", DataType::Float64, true),
1656            Field::new("bmi", DataType::Float64, true),
1657            Field::new("body_fat_pct", DataType::Float64, true),
1658            Field::new("muscle_mass_kg", DataType::Float64, true),
1659        ]));
1660
1661        RecordBatch::try_new(
1662            schema,
1663            vec![
1664                Arc::new(id),
1665                Arc::new(profile_id),
1666                Arc::new(date),
1667                Arc::new(weight_kg),
1668                Arc::new(bmi),
1669                Arc::new(body_fat_pct),
1670                Arc::new(muscle_mass_kg),
1671            ],
1672        )
1673        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1674    }
1675
1676    fn batch_to_weight(batch: &RecordBatch) -> Result<Vec<WeightEntry>> {
1677        let len = batch.num_rows();
1678        let mut records = Vec::with_capacity(len);
1679
1680        let id = batch
1681            .column(0)
1682            .as_any()
1683            .downcast_ref::<Int64Array>()
1684            .unwrap();
1685        let profile_id = batch
1686            .column(1)
1687            .as_any()
1688            .downcast_ref::<Int32Array>()
1689            .unwrap();
1690        let date = batch
1691            .column(2)
1692            .as_any()
1693            .downcast_ref::<Date32Array>()
1694            .unwrap();
1695        let weight_kg = batch
1696            .column(3)
1697            .as_any()
1698            .downcast_ref::<Float64Array>()
1699            .unwrap();
1700        let bmi = batch
1701            .column(4)
1702            .as_any()
1703            .downcast_ref::<Float64Array>()
1704            .unwrap();
1705        let body_fat_pct = batch
1706            .column(5)
1707            .as_any()
1708            .downcast_ref::<Float64Array>()
1709            .unwrap();
1710        let muscle_mass_kg = batch
1711            .column(6)
1712            .as_any()
1713            .downcast_ref::<Float64Array>()
1714            .unwrap();
1715
1716        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1717
1718        for i in 0..len {
1719            records.push(WeightEntry {
1720                id: id.is_valid(i).then(|| id.value(i)),
1721                profile_id: profile_id.value(i),
1722                date: epoch + chrono::Duration::days(date.value(i) as i64),
1723                weight_kg: weight_kg.is_valid(i).then(|| weight_kg.value(i)),
1724                bmi: bmi.is_valid(i).then(|| bmi.value(i)),
1725                body_fat_pct: body_fat_pct.is_valid(i).then(|| body_fat_pct.value(i)),
1726                muscle_mass_kg: muscle_mass_kg.is_valid(i).then(|| muscle_mass_kg.value(i)),
1727            });
1728        }
1729
1730        Ok(records)
1731    }
1732
1733    // =========================================================================
1734    // Profiles
1735    // =========================================================================
1736
1737    /// Write profiles (overwrites entire file)
1738    pub fn write_profiles(&self, profiles: &[Profile]) -> Result<()> {
1739        fs::create_dir_all(&self.base_path)
1740            .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
1741
1742        let path = self.partition_path(EntityType::Profiles, "");
1743        let refs: Vec<&Profile> = profiles.iter().collect();
1744        let batch = Self::profiles_to_batch(refs)?;
1745        self.write_batch(&path, &batch)?;
1746
1747        Ok(())
1748    }
1749
1750    /// Read all profiles
1751    pub fn read_profiles(&self) -> Result<Vec<Profile>> {
1752        let path = self.partition_path(EntityType::Profiles, "");
1753        let batches = self.read_batches(&path)?;
1754        let mut profiles = Vec::new();
1755
1756        for batch in batches {
1757            profiles.extend(Self::batch_to_profiles(&batch)?);
1758        }
1759
1760        Ok(profiles)
1761    }
1762
1763    fn profiles_to_batch(profiles: Vec<&Profile>) -> Result<RecordBatch> {
1764        let profile_id: Int32Array = profiles.iter().map(|p| p.profile_id).collect();
1765        let display_name: StringArray = profiles
1766            .iter()
1767            .map(|p| Some(p.display_name.as_str()))
1768            .collect();
1769        let user_id: Int64Array = profiles.iter().map(|p| p.user_id).collect();
1770        let created_at: TimestampMicrosecondArray = profiles
1771            .iter()
1772            .map(|p| p.created_at.map(|t| t.timestamp_micros()))
1773            .collect();
1774        let last_sync_at: TimestampMicrosecondArray = profiles
1775            .iter()
1776            .map(|p| p.last_sync_at.map(|t| t.timestamp_micros()))
1777            .collect();
1778
1779        let schema = Arc::new(Schema::new(vec![
1780            Field::new("profile_id", DataType::Int32, false),
1781            Field::new("display_name", DataType::Utf8, false),
1782            Field::new("user_id", DataType::Int64, true),
1783            Field::new(
1784                "created_at",
1785                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1786                true,
1787            ),
1788            Field::new(
1789                "last_sync_at",
1790                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1791                true,
1792            ),
1793        ]));
1794
1795        RecordBatch::try_new(
1796            schema,
1797            vec![
1798                Arc::new(profile_id),
1799                Arc::new(display_name),
1800                Arc::new(user_id),
1801                Arc::new(created_at),
1802                Arc::new(last_sync_at),
1803            ],
1804        )
1805        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1806    }
1807
1808    fn batch_to_profiles(batch: &RecordBatch) -> Result<Vec<Profile>> {
1809        let len = batch.num_rows();
1810        let mut profiles = Vec::with_capacity(len);
1811
1812        let profile_id = batch
1813            .column(0)
1814            .as_any()
1815            .downcast_ref::<Int32Array>()
1816            .unwrap();
1817        let display_name = batch
1818            .column(1)
1819            .as_any()
1820            .downcast_ref::<StringArray>()
1821            .unwrap();
1822        let user_id = batch
1823            .column(2)
1824            .as_any()
1825            .downcast_ref::<Int64Array>()
1826            .unwrap();
1827        let created_at = batch
1828            .column(3)
1829            .as_any()
1830            .downcast_ref::<TimestampMicrosecondArray>()
1831            .unwrap();
1832        let last_sync_at = batch
1833            .column(4)
1834            .as_any()
1835            .downcast_ref::<TimestampMicrosecondArray>()
1836            .unwrap();
1837
1838        for i in 0..len {
1839            profiles.push(Profile {
1840                profile_id: profile_id.value(i),
1841                display_name: display_name.value(i).to_string(),
1842                user_id: user_id.is_valid(i).then(|| user_id.value(i)),
1843                created_at: created_at.is_valid(i).then(|| {
1844                    DateTime::from_timestamp_micros(created_at.value(i)).unwrap_or_default()
1845                }),
1846                last_sync_at: last_sync_at.is_valid(i).then(|| {
1847                    DateTime::from_timestamp_micros(last_sync_at.value(i)).unwrap_or_default()
1848                }),
1849            });
1850        }
1851
1852        Ok(profiles)
1853    }
1854
1855    /// Get the base path for external readers to use with DuckDB glob queries
1856    pub fn base_path(&self) -> &Path {
1857        &self.base_path
1858    }
1859}
1860
1861#[cfg(test)]
1862mod tests {
1863    use super::*;
1864    use tempfile::TempDir;
1865
1866    #[test]
1867    fn test_activity_round_trip() {
1868        let temp = TempDir::new().unwrap();
1869        let store = ParquetStore::new(temp.path());
1870
1871        let activities = vec![Activity {
1872            activity_id: 123,
1873            profile_id: 1,
1874            activity_name: Some("Morning Run".to_string()),
1875            activity_type: Some("running".to_string()),
1876            start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1877            start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1878            duration_sec: Some(3600.0),
1879            distance_m: Some(10000.0),
1880            calories: Some(500),
1881            avg_hr: Some(150),
1882            max_hr: Some(175),
1883            avg_speed: Some(2.78),
1884            max_speed: Some(3.5),
1885            elevation_gain: Some(100.0),
1886            elevation_loss: Some(100.0),
1887            avg_cadence: Some(180.0),
1888            avg_power: None,
1889            normalized_power: None,
1890            training_effect: Some(3.5),
1891            training_load: Some(120.0),
1892            start_lat: Some(37.7749),
1893            start_lon: Some(-122.4194),
1894            end_lat: Some(37.7849),
1895            end_lon: Some(-122.4094),
1896            ground_contact_time: Some(250.0),
1897            vertical_oscillation: Some(8.5),
1898            stride_length: Some(1.2),
1899            location_name: Some("San Francisco".to_string()),
1900            raw_json: None,
1901        }];
1902
1903        store.upsert_activities(&activities).unwrap();
1904
1905        // Read back
1906        let key = EntityType::Activities
1907            .partition_key(activities[0].start_time_local.unwrap().date_naive());
1908        let path = store.partition_path(EntityType::Activities, &key);
1909        let read_back = store.read_activities_from_path(&path).unwrap();
1910
1911        assert_eq!(read_back.len(), 1);
1912        assert_eq!(read_back[0].activity_id, 123);
1913        assert_eq!(read_back[0].activity_name, Some("Morning Run".to_string()));
1914    }
1915
1916    #[test]
1917    fn test_daily_health_round_trip() {
1918        let temp = TempDir::new().unwrap();
1919        let store = ParquetStore::new(temp.path());
1920
1921        let records = vec![DailyHealth {
1922            id: None,
1923            profile_id: 1,
1924            date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
1925            steps: Some(10000),
1926            step_goal: Some(8000),
1927            total_calories: Some(2500),
1928            active_calories: Some(500),
1929            bmr_calories: Some(2000),
1930            resting_hr: Some(55),
1931            sleep_seconds: Some(28800),
1932            deep_sleep_seconds: Some(7200),
1933            light_sleep_seconds: Some(14400),
1934            rem_sleep_seconds: Some(7200),
1935            sleep_score: Some(85),
1936            avg_stress: Some(30),
1937            max_stress: Some(75),
1938            body_battery_start: Some(95),
1939            body_battery_end: Some(45),
1940            hrv_weekly_avg: Some(45),
1941            hrv_last_night: Some(48),
1942            hrv_status: Some("balanced".to_string()),
1943            avg_respiration: Some(14.5),
1944            avg_spo2: Some(96),
1945            lowest_spo2: Some(92),
1946            hydration_ml: Some(2500),
1947            moderate_intensity_min: Some(30),
1948            vigorous_intensity_min: Some(20),
1949            raw_json: None,
1950        }];
1951
1952        store.upsert_daily_health(&records).unwrap();
1953
1954        // Read back
1955        let key = EntityType::DailyHealth.partition_key(records[0].date);
1956        let path = store.partition_path(EntityType::DailyHealth, &key);
1957        let read_back = store.read_daily_health_from_path(&path).unwrap();
1958
1959        assert_eq!(read_back.len(), 1);
1960        assert_eq!(read_back[0].steps, Some(10000));
1961        assert_eq!(
1962            read_back[0].date,
1963            NaiveDate::from_ymd_opt(2024, 12, 15).unwrap()
1964        );
1965    }
1966
1967    #[test]
1968    fn test_has_daily_health() {
1969        let temp = TempDir::new().unwrap();
1970        let store = ParquetStore::new(temp.path());
1971        let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
1972
1973        assert!(!store.has_daily_health(1, date).unwrap());
1974
1975        let records = vec![DailyHealth {
1976            id: None,
1977            profile_id: 1,
1978            date,
1979            steps: Some(10000),
1980            step_goal: None,
1981            total_calories: None,
1982            active_calories: None,
1983            bmr_calories: None,
1984            resting_hr: None,
1985            sleep_seconds: None,
1986            deep_sleep_seconds: None,
1987            light_sleep_seconds: None,
1988            rem_sleep_seconds: None,
1989            sleep_score: None,
1990            avg_stress: None,
1991            max_stress: None,
1992            body_battery_start: None,
1993            body_battery_end: None,
1994            hrv_weekly_avg: None,
1995            hrv_last_night: None,
1996            hrv_status: None,
1997            avg_respiration: None,
1998            avg_spo2: None,
1999            lowest_spo2: None,
2000            hydration_ml: None,
2001            moderate_intensity_min: None,
2002            vigorous_intensity_min: None,
2003            raw_json: None,
2004        }];
2005
2006        store.upsert_daily_health(&records).unwrap();
2007        assert!(store.has_daily_health(1, date).unwrap());
2008    }
2009
2010    #[test]
2011    fn test_has_track_points() {
2012        let temp = TempDir::new().unwrap();
2013        let store = ParquetStore::new(temp.path());
2014        let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
2015
2016        assert!(!store.has_track_points(42, date).unwrap());
2017
2018        let points = vec![TrackPoint {
2019            id: None,
2020            activity_id: 42,
2021            timestamp: DateTime::from_timestamp(1703001600, 0).unwrap(),
2022            lat: Some(1.0),
2023            lon: Some(2.0),
2024            elevation: Some(3.0),
2025            heart_rate: None,
2026            cadence: None,
2027            power: None,
2028            speed: None,
2029        }];
2030
2031        store.write_track_points(date, &points).unwrap();
2032        assert!(store.has_track_points(42, date).unwrap());
2033    }
2034
2035    #[test]
2036    fn test_concurrent_read_write() {
2037        use std::sync::{Arc, Barrier};
2038        use std::thread;
2039
2040        let temp = TempDir::new().unwrap();
2041        let store = Arc::new(ParquetStore::new(temp.path()));
2042
2043        // Write initial data
2044        let initial_activities = vec![Activity {
2045            activity_id: 1,
2046            profile_id: 1,
2047            activity_name: Some("Initial Activity".to_string()),
2048            activity_type: Some("running".to_string()),
2049            start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2050            start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2051            duration_sec: Some(3600.0),
2052            distance_m: Some(10000.0),
2053            calories: None,
2054            avg_hr: None,
2055            max_hr: None,
2056            avg_speed: None,
2057            max_speed: None,
2058            elevation_gain: None,
2059            elevation_loss: None,
2060            avg_cadence: None,
2061            avg_power: None,
2062            normalized_power: None,
2063            training_effect: None,
2064            training_load: None,
2065            start_lat: None,
2066            start_lon: None,
2067            end_lat: None,
2068            end_lon: None,
2069            ground_contact_time: None,
2070            vertical_oscillation: None,
2071            stride_length: None,
2072            location_name: None,
2073            raw_json: None,
2074        }];
2075        store.upsert_activities(&initial_activities).unwrap();
2076
2077        // Barrier to coordinate reader/writer threads
2078        let barrier = Arc::new(Barrier::new(2));
2079        let store_reader = Arc::clone(&store);
2080        let barrier_reader = Arc::clone(&barrier);
2081
2082        // Reader thread: reads existing data while writer writes new data
2083        let reader_handle = thread::spawn(move || {
2084            barrier_reader.wait(); // Sync with writer
2085
2086            // Read the existing file
2087            let key = EntityType::Activities
2088                .partition_key(NaiveDate::from_ymd_opt(2023, 12, 19).unwrap());
2089            let path = store_reader.partition_path(EntityType::Activities, &key);
2090            let activities = store_reader.read_activities_from_path(&path).unwrap();
2091
2092            // Should see at least the initial activity
2093            assert!(!activities.is_empty(), "Reader should see initial data");
2094            activities.len()
2095        });
2096
2097        // Writer thread: writes new data to a DIFFERENT partition
2098        let writer_handle = thread::spawn(move || {
2099            barrier.wait(); // Sync with reader
2100
2101            // Write to a different partition (different week)
2102            let new_activities = vec![Activity {
2103                activity_id: 2,
2104                profile_id: 1,
2105                activity_name: Some("Concurrent Activity".to_string()),
2106                activity_type: Some("cycling".to_string()),
2107                start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), // Different week
2108                start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2109                duration_sec: Some(7200.0),
2110                distance_m: Some(50000.0),
2111                calories: None,
2112                avg_hr: None,
2113                max_hr: None,
2114                avg_speed: None,
2115                max_speed: None,
2116                elevation_gain: None,
2117                elevation_loss: None,
2118                avg_cadence: None,
2119                avg_power: None,
2120                normalized_power: None,
2121                training_effect: None,
2122                training_load: None,
2123                start_lat: None,
2124                start_lon: None,
2125                end_lat: None,
2126                end_lon: None,
2127                ground_contact_time: None,
2128                vertical_oscillation: None,
2129                stride_length: None,
2130                location_name: None,
2131                raw_json: None,
2132            }];
2133            store.upsert_activities(&new_activities).unwrap();
2134            2 // Return activity id written
2135        });
2136
2137        // Wait for both threads to complete
2138        let read_count = reader_handle.join().expect("Reader thread panicked");
2139        let written_id = writer_handle.join().expect("Writer thread panicked");
2140
2141        assert_eq!(read_count, 1, "Reader should have read 1 activity");
2142        assert_eq!(written_id, 2, "Writer should have written activity 2");
2143    }
2144
2145    #[test]
2146    fn test_duckdb_glob_query() {
2147        use duckdb::Connection;
2148
2149        let temp = TempDir::new().unwrap();
2150        let store = ParquetStore::new(temp.path());
2151
2152        // Write activities to multiple partitions
2153        let activities = vec![
2154            Activity {
2155                activity_id: 1,
2156                profile_id: 1,
2157                activity_name: Some("Week 51 Run".to_string()),
2158                activity_type: Some("running".to_string()),
2159                start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()), // 2023-W51
2160                start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2161                duration_sec: Some(3600.0),
2162                distance_m: Some(10000.0),
2163                calories: None,
2164                avg_hr: None,
2165                max_hr: None,
2166                avg_speed: None,
2167                max_speed: None,
2168                elevation_gain: None,
2169                elevation_loss: None,
2170                avg_cadence: None,
2171                avg_power: None,
2172                normalized_power: None,
2173                training_effect: None,
2174                training_load: None,
2175                start_lat: None,
2176                start_lon: None,
2177                end_lat: None,
2178                end_lon: None,
2179                ground_contact_time: None,
2180                vertical_oscillation: None,
2181                stride_length: None,
2182                location_name: None,
2183                raw_json: None,
2184            },
2185            Activity {
2186                activity_id: 2,
2187                profile_id: 1,
2188                activity_name: Some("Week 52 Ride".to_string()),
2189                activity_type: Some("cycling".to_string()),
2190                start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), // 2023-W52
2191                start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2192                duration_sec: Some(7200.0),
2193                distance_m: Some(50000.0),
2194                calories: None,
2195                avg_hr: None,
2196                max_hr: None,
2197                avg_speed: None,
2198                max_speed: None,
2199                elevation_gain: None,
2200                elevation_loss: None,
2201                avg_cadence: None,
2202                avg_power: None,
2203                normalized_power: None,
2204                training_effect: None,
2205                training_load: None,
2206                start_lat: None,
2207                start_lon: None,
2208                end_lat: None,
2209                end_lon: None,
2210                ground_contact_time: None,
2211                vertical_oscillation: None,
2212                stride_length: None,
2213                location_name: None,
2214                raw_json: None,
2215            },
2216        ];
2217        store.upsert_activities(&activities).unwrap();
2218
2219        // Use DuckDB to query all Parquet files with glob pattern
2220        let conn = Connection::open_in_memory().unwrap();
2221        let glob_pattern = format!("{}/*.parquet", temp.path().join("activities").display());
2222
2223        let mut stmt = conn
2224            .prepare(&format!(
2225                "SELECT activity_id, activity_name FROM '{}' ORDER BY activity_id",
2226                glob_pattern
2227            ))
2228            .unwrap();
2229
2230        let results: Vec<(i64, String)> = stmt
2231            .query_map([], |row| {
2232                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
2233            })
2234            .unwrap()
2235            .filter_map(|r| r.ok())
2236            .collect();
2237
2238        // Should see both activities from different partitions
2239        assert_eq!(results.len(), 2);
2240        assert_eq!(results[0], (1, "Week 51 Run".to_string()));
2241        assert_eq!(results[1], (2, "Week 52 Ride".to_string()));
2242    }
2243}