Skip to main content

garmin_cli/storage/
parquet.rs

1//! Parquet read/write utilities for time-partitioned storage
2//!
3//! Uses Arrow record batches for efficient columnar storage.
4//! Supports concurrent writes to different partitions via partition-level locks.
5
6use std::fs::{self, File};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::*;
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow::record_batch::RecordBatch;
13use chrono::{DateTime, NaiveDate};
14use dashmap::DashMap;
15use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
16use parquet::arrow::ArrowWriter;
17use parquet::basic::Compression;
18use parquet::file::properties::WriterProperties;
19use tokio::sync::Mutex as TokioMutex;
20
21use crate::db::models::{
22    Activity, DailyHealth, PerformanceMetrics, Profile, TrackPoint, WeightEntry,
23};
24use crate::error::{GarminError, Result};
25
26use super::partitions::EntityType;
27
28/// Parquet storage for Garmin data
29///
30/// Supports concurrent writes to different partitions. Each partition has its own
31/// lock to prevent lost updates when multiple workers write to the same partition.
32#[derive(Clone)]
33pub struct ParquetStore {
34    base_path: PathBuf,
35    /// Per-partition locks for concurrent write safety
36    partition_locks: Arc<DashMap<String, Arc<TokioMutex<()>>>>,
37}
38
39impl ParquetStore {
40    /// Create a new ParquetStore at the given base path
41    pub fn new(base_path: impl Into<PathBuf>) -> Self {
42        Self {
43            base_path: base_path.into(),
44            partition_locks: Arc::new(DashMap::new()),
45        }
46    }
47
48    /// Get or create a lock for a specific partition
49    fn get_partition_lock(&self, partition_key: &str) -> Arc<TokioMutex<()>> {
50        self.partition_locks
51            .entry(partition_key.to_string())
52            .or_insert_with(|| Arc::new(TokioMutex::new(())))
53            .clone()
54    }
55
56    /// Check if daily health data exists for a profile/date
57    pub fn has_daily_health(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
58        let key = EntityType::DailyHealth.partition_key(date);
59        let path = self.partition_path(EntityType::DailyHealth, &key);
60        if !path.exists() {
61            return Ok(false);
62        }
63        let records = self.read_daily_health_from_path(&path)?;
64        Ok(records
65            .iter()
66            .any(|r| r.profile_id == profile_id && r.date == date))
67    }
68
69    /// Check if performance metrics exist for a profile/date
70    pub fn has_performance_metrics(&self, profile_id: i32, date: NaiveDate) -> Result<bool> {
71        let key = EntityType::PerformanceMetrics.partition_key(date);
72        let path = self.partition_path(EntityType::PerformanceMetrics, &key);
73        if !path.exists() {
74            return Ok(false);
75        }
76        let records = self.read_performance_metrics_from_path(&path)?;
77        Ok(records
78            .iter()
79            .any(|r| r.profile_id == profile_id && r.date == date))
80    }
81
82    /// Check if track points exist for an activity/date partition
83    pub fn has_track_points(&self, activity_id: i64, date: NaiveDate) -> Result<bool> {
84        let key = EntityType::TrackPoints.partition_key(date);
85        let path = self.partition_path(EntityType::TrackPoints, &key);
86        if !path.exists() {
87            return Ok(false);
88        }
89        let records = self.read_track_points_from_path(&path)?;
90        Ok(records.iter().any(|r| r.activity_id == activity_id))
91    }
92
93    /// Get the full path for a partition file
94    pub fn partition_path(&self, entity: EntityType, partition_key: &str) -> PathBuf {
95        match entity {
96            EntityType::Profiles => self.base_path.join("profiles.parquet"),
97            _ => self
98                .base_path
99                .join(entity.dir_name())
100                .join(format!("{}.parquet", partition_key)),
101        }
102    }
103
104    /// Ensure the directory for an entity exists
105    fn ensure_dir(&self, entity: EntityType) -> Result<()> {
106        if entity != EntityType::Profiles {
107            let dir = self.base_path.join(entity.dir_name());
108            fs::create_dir_all(&dir).map_err(|e| {
109                GarminError::Database(format!("Failed to create directory {:?}: {}", dir, e))
110            })?;
111        } else {
112            fs::create_dir_all(&self.base_path).map_err(|e| {
113                GarminError::Database(format!(
114                    "Failed to create directory {:?}: {}",
115                    self.base_path, e
116                ))
117            })?;
118        }
119        Ok(())
120    }
121
122    /// Write a record batch to a partition file atomically
123    fn write_batch(&self, path: &Path, batch: &RecordBatch) -> Result<()> {
124        // Write to temp file first
125        let temp_path = path.with_extension("parquet.tmp");
126
127        // Ensure parent directory exists
128        if let Some(parent) = path.parent() {
129            fs::create_dir_all(parent)
130                .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
131        }
132
133        let file = File::create(&temp_path)
134            .map_err(|e| GarminError::Database(format!("Failed to create temp file: {}", e)))?;
135
136        let props = WriterProperties::builder()
137            .set_compression(Compression::ZSTD(Default::default()))
138            .build();
139
140        let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
141            GarminError::Database(format!("Failed to create Parquet writer: {}", e))
142        })?;
143
144        writer
145            .write(batch)
146            .map_err(|e| GarminError::Database(format!("Failed to write batch: {}", e)))?;
147
148        writer
149            .close()
150            .map_err(|e| GarminError::Database(format!("Failed to close writer: {}", e)))?;
151
152        // Atomic rename
153        fs::rename(&temp_path, path)
154            .map_err(|e| GarminError::Database(format!("Failed to rename temp file: {}", e)))?;
155
156        Ok(())
157    }
158
159    /// Read all record batches from a partition file
160    fn read_batches(&self, path: &Path) -> Result<Vec<RecordBatch>> {
161        if !path.exists() {
162            return Ok(Vec::new());
163        }
164
165        let file = File::open(path)
166            .map_err(|e| GarminError::Database(format!("Failed to open file: {}", e)))?;
167
168        let reader = ParquetRecordBatchReaderBuilder::try_new(file)
169            .map_err(|e| GarminError::Database(format!("Failed to create reader: {}", e)))?
170            .build()
171            .map_err(|e| GarminError::Database(format!("Failed to build reader: {}", e)))?;
172
173        reader
174            .collect::<std::result::Result<Vec<_>, _>>()
175            .map_err(|e| GarminError::Database(format!("Failed to read batches: {}", e)))
176    }
177
178    // =========================================================================
179    // Activities
180    // =========================================================================
181
182    /// Write activities to weekly partitions
183    pub fn write_activities(&self, activities: &[Activity]) -> Result<()> {
184        self.ensure_dir(EntityType::Activities)?;
185
186        // Group by partition key
187        let mut partitions: std::collections::HashMap<String, Vec<&Activity>> =
188            std::collections::HashMap::new();
189
190        for activity in activities {
191            if let Some(start_time) = activity.start_time_local {
192                let key = EntityType::Activities.partition_key(start_time.date_naive());
193                partitions.entry(key).or_default().push(activity);
194            }
195        }
196
197        // Write each partition
198        for (key, partition_activities) in partitions {
199            let path = self.partition_path(EntityType::Activities, &key);
200            let batch = Self::activities_to_batch(partition_activities)?;
201            self.write_batch(&path, &batch)?;
202        }
203
204        Ok(())
205    }
206
207    /// Upsert activities (read existing, merge, write)
208    pub fn upsert_activities(&self, activities: &[Activity]) -> Result<()> {
209        self.ensure_dir(EntityType::Activities)?;
210
211        // Group new activities by partition key
212        let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
213            std::collections::HashMap::new();
214
215        for activity in activities {
216            if let Some(start_time) = activity.start_time_local {
217                let key = EntityType::Activities.partition_key(start_time.date_naive());
218                partitions.entry(key).or_default().push(activity.clone());
219            }
220        }
221
222        // For each partition, read existing and merge
223        for (key, mut new_activities) in partitions {
224            let path = self.partition_path(EntityType::Activities, &key);
225
226            // Read existing
227            let mut existing = self.read_activities_from_path(&path)?;
228
229            // Create set of new activity IDs for fast lookup
230            let new_ids: std::collections::HashSet<i64> =
231                new_activities.iter().map(|a| a.activity_id).collect();
232
233            // Keep existing activities that aren't being replaced
234            existing.retain(|a| !new_ids.contains(&a.activity_id));
235
236            // Merge
237            existing.append(&mut new_activities);
238
239            // Sort by activity_id for consistent ordering
240            existing.sort_by_key(|a| a.activity_id);
241
242            // Write merged
243            let refs: Vec<&Activity> = existing.iter().collect();
244            let batch = Self::activities_to_batch(refs)?;
245            self.write_batch(&path, &batch)?;
246        }
247
248        Ok(())
249    }
250
251    /// Upsert activities with partition-level locking for concurrent writes
252    pub async fn upsert_activities_async(&self, activities: &[Activity]) -> Result<()> {
253        self.ensure_dir(EntityType::Activities)?;
254
255        // Group new activities by partition key
256        let mut partitions: std::collections::HashMap<String, Vec<Activity>> =
257            std::collections::HashMap::new();
258
259        for activity in activities {
260            if let Some(start_time) = activity.start_time_local {
261                let key = EntityType::Activities.partition_key(start_time.date_naive());
262                partitions.entry(key).or_default().push(activity.clone());
263            }
264        }
265
266        // For each partition, acquire lock then read/merge/write
267        for (key, mut new_activities) in partitions {
268            // Acquire partition lock
269            let lock = self.get_partition_lock(&key);
270            let _guard = lock.lock().await;
271
272            let path = self.partition_path(EntityType::Activities, &key);
273
274            // Read existing
275            let mut existing = self.read_activities_from_path(&path)?;
276
277            // Create set of new activity IDs for fast lookup
278            let new_ids: std::collections::HashSet<i64> =
279                new_activities.iter().map(|a| a.activity_id).collect();
280
281            // Keep existing activities that aren't being replaced
282            existing.retain(|a| !new_ids.contains(&a.activity_id));
283
284            // Merge
285            existing.append(&mut new_activities);
286
287            // Sort by activity_id for consistent ordering
288            existing.sort_by_key(|a| a.activity_id);
289
290            // Write merged
291            let refs: Vec<&Activity> = existing.iter().collect();
292            let batch = Self::activities_to_batch(refs)?;
293            self.write_batch(&path, &batch)?;
294        }
295
296        Ok(())
297    }
298
299    fn read_activities_from_path(&self, path: &Path) -> Result<Vec<Activity>> {
300        let batches = self.read_batches(path)?;
301        let mut activities = Vec::new();
302
303        for batch in batches {
304            activities.extend(Self::batch_to_activities(&batch)?);
305        }
306
307        Ok(activities)
308    }
309
310    fn activities_to_batch(activities: Vec<&Activity>) -> Result<RecordBatch> {
311        let activity_id: Int64Array = activities.iter().map(|a| a.activity_id).collect();
312        let profile_id: Int32Array = activities.iter().map(|a| a.profile_id).collect();
313        let activity_name: StringArray = activities
314            .iter()
315            .map(|a| a.activity_name.as_deref())
316            .collect();
317        let activity_type: StringArray = activities
318            .iter()
319            .map(|a| a.activity_type.as_deref())
320            .collect();
321        let start_time_local: TimestampMicrosecondArray = activities
322            .iter()
323            .map(|a| a.start_time_local.map(|t| t.timestamp_micros()))
324            .collect();
325        let start_time_gmt: TimestampMicrosecondArray = activities
326            .iter()
327            .map(|a| a.start_time_gmt.map(|t| t.timestamp_micros()))
328            .collect();
329        let duration_sec: Float64Array = activities.iter().map(|a| a.duration_sec).collect();
330        let distance_m: Float64Array = activities.iter().map(|a| a.distance_m).collect();
331        let calories: Int32Array = activities.iter().map(|a| a.calories).collect();
332        let avg_hr: Int32Array = activities.iter().map(|a| a.avg_hr).collect();
333        let max_hr: Int32Array = activities.iter().map(|a| a.max_hr).collect();
334        let avg_speed: Float64Array = activities.iter().map(|a| a.avg_speed).collect();
335        let max_speed: Float64Array = activities.iter().map(|a| a.max_speed).collect();
336        let elevation_gain: Float64Array = activities.iter().map(|a| a.elevation_gain).collect();
337        let elevation_loss: Float64Array = activities.iter().map(|a| a.elevation_loss).collect();
338        let avg_cadence: Float64Array = activities.iter().map(|a| a.avg_cadence).collect();
339        let avg_power: Int32Array = activities.iter().map(|a| a.avg_power).collect();
340        let normalized_power: Int32Array = activities.iter().map(|a| a.normalized_power).collect();
341        let training_effect: Float64Array = activities.iter().map(|a| a.training_effect).collect();
342        let training_load: Float64Array = activities.iter().map(|a| a.training_load).collect();
343        let start_lat: Float64Array = activities.iter().map(|a| a.start_lat).collect();
344        let start_lon: Float64Array = activities.iter().map(|a| a.start_lon).collect();
345        let end_lat: Float64Array = activities.iter().map(|a| a.end_lat).collect();
346        let end_lon: Float64Array = activities.iter().map(|a| a.end_lon).collect();
347        let ground_contact_time: Float64Array =
348            activities.iter().map(|a| a.ground_contact_time).collect();
349        let vertical_oscillation: Float64Array =
350            activities.iter().map(|a| a.vertical_oscillation).collect();
351        let stride_length: Float64Array = activities.iter().map(|a| a.stride_length).collect();
352        let location_name: StringArray = activities
353            .iter()
354            .map(|a| a.location_name.as_deref())
355            .collect();
356        let raw_json: StringArray = activities
357            .iter()
358            .map(|a| a.raw_json.as_ref().map(|j| j.to_string()))
359            .collect();
360
361        let schema = Arc::new(Schema::new(vec![
362            Field::new("activity_id", DataType::Int64, false),
363            Field::new("profile_id", DataType::Int32, false),
364            Field::new("activity_name", DataType::Utf8, true),
365            Field::new("activity_type", DataType::Utf8, true),
366            Field::new(
367                "start_time_local",
368                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
369                true,
370            ),
371            Field::new(
372                "start_time_gmt",
373                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
374                true,
375            ),
376            Field::new("duration_sec", DataType::Float64, true),
377            Field::new("distance_m", DataType::Float64, true),
378            Field::new("calories", DataType::Int32, true),
379            Field::new("avg_hr", DataType::Int32, true),
380            Field::new("max_hr", DataType::Int32, true),
381            Field::new("avg_speed", DataType::Float64, true),
382            Field::new("max_speed", DataType::Float64, true),
383            Field::new("elevation_gain", DataType::Float64, true),
384            Field::new("elevation_loss", DataType::Float64, true),
385            Field::new("avg_cadence", DataType::Float64, true),
386            Field::new("avg_power", DataType::Int32, true),
387            Field::new("normalized_power", DataType::Int32, true),
388            Field::new("training_effect", DataType::Float64, true),
389            Field::new("training_load", DataType::Float64, true),
390            Field::new("start_lat", DataType::Float64, true),
391            Field::new("start_lon", DataType::Float64, true),
392            Field::new("end_lat", DataType::Float64, true),
393            Field::new("end_lon", DataType::Float64, true),
394            Field::new("ground_contact_time", DataType::Float64, true),
395            Field::new("vertical_oscillation", DataType::Float64, true),
396            Field::new("stride_length", DataType::Float64, true),
397            Field::new("location_name", DataType::Utf8, true),
398            Field::new("raw_json", DataType::Utf8, true),
399        ]));
400
401        RecordBatch::try_new(
402            schema,
403            vec![
404                Arc::new(activity_id),
405                Arc::new(profile_id),
406                Arc::new(activity_name),
407                Arc::new(activity_type),
408                Arc::new(start_time_local),
409                Arc::new(start_time_gmt),
410                Arc::new(duration_sec),
411                Arc::new(distance_m),
412                Arc::new(calories),
413                Arc::new(avg_hr),
414                Arc::new(max_hr),
415                Arc::new(avg_speed),
416                Arc::new(max_speed),
417                Arc::new(elevation_gain),
418                Arc::new(elevation_loss),
419                Arc::new(avg_cadence),
420                Arc::new(avg_power),
421                Arc::new(normalized_power),
422                Arc::new(training_effect),
423                Arc::new(training_load),
424                Arc::new(start_lat),
425                Arc::new(start_lon),
426                Arc::new(end_lat),
427                Arc::new(end_lon),
428                Arc::new(ground_contact_time),
429                Arc::new(vertical_oscillation),
430                Arc::new(stride_length),
431                Arc::new(location_name),
432                Arc::new(raw_json),
433            ],
434        )
435        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
436    }
437
438    fn batch_to_activities(batch: &RecordBatch) -> Result<Vec<Activity>> {
439        let len = batch.num_rows();
440        let mut activities = Vec::with_capacity(len);
441
442        let activity_id = batch
443            .column(0)
444            .as_any()
445            .downcast_ref::<Int64Array>()
446            .unwrap();
447        let profile_id = batch
448            .column(1)
449            .as_any()
450            .downcast_ref::<Int32Array>()
451            .unwrap();
452        let activity_name = batch
453            .column(2)
454            .as_any()
455            .downcast_ref::<StringArray>()
456            .unwrap();
457        let activity_type = batch
458            .column(3)
459            .as_any()
460            .downcast_ref::<StringArray>()
461            .unwrap();
462        let start_time_local = batch
463            .column(4)
464            .as_any()
465            .downcast_ref::<TimestampMicrosecondArray>()
466            .unwrap();
467        let start_time_gmt = batch
468            .column(5)
469            .as_any()
470            .downcast_ref::<TimestampMicrosecondArray>()
471            .unwrap();
472        let duration_sec = batch
473            .column(6)
474            .as_any()
475            .downcast_ref::<Float64Array>()
476            .unwrap();
477        let distance_m = batch
478            .column(7)
479            .as_any()
480            .downcast_ref::<Float64Array>()
481            .unwrap();
482        let calories = batch
483            .column(8)
484            .as_any()
485            .downcast_ref::<Int32Array>()
486            .unwrap();
487        let avg_hr = batch
488            .column(9)
489            .as_any()
490            .downcast_ref::<Int32Array>()
491            .unwrap();
492        let max_hr = batch
493            .column(10)
494            .as_any()
495            .downcast_ref::<Int32Array>()
496            .unwrap();
497        let avg_speed = batch
498            .column(11)
499            .as_any()
500            .downcast_ref::<Float64Array>()
501            .unwrap();
502        let max_speed = batch
503            .column(12)
504            .as_any()
505            .downcast_ref::<Float64Array>()
506            .unwrap();
507        let elevation_gain = batch
508            .column(13)
509            .as_any()
510            .downcast_ref::<Float64Array>()
511            .unwrap();
512        let elevation_loss = batch
513            .column(14)
514            .as_any()
515            .downcast_ref::<Float64Array>()
516            .unwrap();
517        let avg_cadence = batch
518            .column(15)
519            .as_any()
520            .downcast_ref::<Float64Array>()
521            .unwrap();
522        let avg_power = batch
523            .column(16)
524            .as_any()
525            .downcast_ref::<Int32Array>()
526            .unwrap();
527        let normalized_power = batch
528            .column(17)
529            .as_any()
530            .downcast_ref::<Int32Array>()
531            .unwrap();
532        let training_effect = batch
533            .column(18)
534            .as_any()
535            .downcast_ref::<Float64Array>()
536            .unwrap();
537        let training_load = batch
538            .column(19)
539            .as_any()
540            .downcast_ref::<Float64Array>()
541            .unwrap();
542        let start_lat = batch
543            .column(20)
544            .as_any()
545            .downcast_ref::<Float64Array>()
546            .unwrap();
547        let start_lon = batch
548            .column(21)
549            .as_any()
550            .downcast_ref::<Float64Array>()
551            .unwrap();
552        let end_lat = batch
553            .column(22)
554            .as_any()
555            .downcast_ref::<Float64Array>()
556            .unwrap();
557        let end_lon = batch
558            .column(23)
559            .as_any()
560            .downcast_ref::<Float64Array>()
561            .unwrap();
562        let ground_contact_time = batch
563            .column(24)
564            .as_any()
565            .downcast_ref::<Float64Array>()
566            .unwrap();
567        let vertical_oscillation = batch
568            .column(25)
569            .as_any()
570            .downcast_ref::<Float64Array>()
571            .unwrap();
572        let stride_length = batch
573            .column(26)
574            .as_any()
575            .downcast_ref::<Float64Array>()
576            .unwrap();
577        let location_name = batch
578            .column(27)
579            .as_any()
580            .downcast_ref::<StringArray>()
581            .unwrap();
582        let raw_json = batch
583            .column(28)
584            .as_any()
585            .downcast_ref::<StringArray>()
586            .unwrap();
587
588        for i in 0..len {
589            activities.push(Activity {
590                activity_id: activity_id.value(i),
591                profile_id: profile_id.value(i),
592                activity_name: activity_name
593                    .is_valid(i)
594                    .then(|| activity_name.value(i).to_string()),
595                activity_type: activity_type
596                    .is_valid(i)
597                    .then(|| activity_type.value(i).to_string()),
598                start_time_local: start_time_local.is_valid(i).then(|| {
599                    DateTime::from_timestamp_micros(start_time_local.value(i)).unwrap_or_default()
600                }),
601                start_time_gmt: start_time_gmt.is_valid(i).then(|| {
602                    DateTime::from_timestamp_micros(start_time_gmt.value(i)).unwrap_or_default()
603                }),
604                duration_sec: duration_sec.is_valid(i).then(|| duration_sec.value(i)),
605                distance_m: distance_m.is_valid(i).then(|| distance_m.value(i)),
606                calories: calories.is_valid(i).then(|| calories.value(i)),
607                avg_hr: avg_hr.is_valid(i).then(|| avg_hr.value(i)),
608                max_hr: max_hr.is_valid(i).then(|| max_hr.value(i)),
609                avg_speed: avg_speed.is_valid(i).then(|| avg_speed.value(i)),
610                max_speed: max_speed.is_valid(i).then(|| max_speed.value(i)),
611                elevation_gain: elevation_gain.is_valid(i).then(|| elevation_gain.value(i)),
612                elevation_loss: elevation_loss.is_valid(i).then(|| elevation_loss.value(i)),
613                avg_cadence: avg_cadence.is_valid(i).then(|| avg_cadence.value(i)),
614                avg_power: avg_power.is_valid(i).then(|| avg_power.value(i)),
615                normalized_power: normalized_power
616                    .is_valid(i)
617                    .then(|| normalized_power.value(i)),
618                training_effect: training_effect
619                    .is_valid(i)
620                    .then(|| training_effect.value(i)),
621                training_load: training_load.is_valid(i).then(|| training_load.value(i)),
622                start_lat: start_lat.is_valid(i).then(|| start_lat.value(i)),
623                start_lon: start_lon.is_valid(i).then(|| start_lon.value(i)),
624                end_lat: end_lat.is_valid(i).then(|| end_lat.value(i)),
625                end_lon: end_lon.is_valid(i).then(|| end_lon.value(i)),
626                ground_contact_time: ground_contact_time
627                    .is_valid(i)
628                    .then(|| ground_contact_time.value(i)),
629                vertical_oscillation: vertical_oscillation
630                    .is_valid(i)
631                    .then(|| vertical_oscillation.value(i)),
632                stride_length: stride_length.is_valid(i).then(|| stride_length.value(i)),
633                location_name: location_name
634                    .is_valid(i)
635                    .then(|| location_name.value(i).to_string()),
636                raw_json: raw_json
637                    .is_valid(i)
638                    .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
639            });
640        }
641
642        Ok(activities)
643    }
644
645    // =========================================================================
646    // Daily Health
647    // =========================================================================
648
649    /// Upsert daily health records
650    pub fn upsert_daily_health(&self, records: &[DailyHealth]) -> Result<()> {
651        self.ensure_dir(EntityType::DailyHealth)?;
652
653        // Group by partition key
654        let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
655            std::collections::HashMap::new();
656
657        for record in records {
658            let key = EntityType::DailyHealth.partition_key(record.date);
659            partitions.entry(key).or_default().push(record.clone());
660        }
661
662        // For each partition, read existing and merge
663        for (key, mut new_records) in partitions {
664            let path = self.partition_path(EntityType::DailyHealth, &key);
665
666            // Read existing
667            let mut existing = self.read_daily_health_from_path(&path)?;
668
669            // Create set of new dates for fast lookup
670            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
671                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
672
673            // Keep existing records that aren't being replaced
674            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
675
676            // Merge
677            existing.append(&mut new_records);
678
679            // Sort by date for consistent ordering
680            existing.sort_by_key(|r| (r.profile_id, r.date));
681
682            // Write merged
683            let refs: Vec<&DailyHealth> = existing.iter().collect();
684            let batch = Self::daily_health_to_batch(refs)?;
685            self.write_batch(&path, &batch)?;
686        }
687
688        Ok(())
689    }
690
691    /// Upsert daily health records with partition-level locking for concurrent writes
692    pub async fn upsert_daily_health_async(&self, records: &[DailyHealth]) -> Result<()> {
693        self.ensure_dir(EntityType::DailyHealth)?;
694
695        // Group by partition key
696        let mut partitions: std::collections::HashMap<String, Vec<DailyHealth>> =
697            std::collections::HashMap::new();
698
699        for record in records {
700            let key = EntityType::DailyHealth.partition_key(record.date);
701            partitions.entry(key).or_default().push(record.clone());
702        }
703
704        // For each partition, acquire lock then read/merge/write
705        for (key, mut new_records) in partitions {
706            // Acquire partition lock
707            let lock = self.get_partition_lock(&key);
708            let _guard = lock.lock().await;
709
710            let path = self.partition_path(EntityType::DailyHealth, &key);
711
712            // Read existing
713            let mut existing = self.read_daily_health_from_path(&path)?;
714
715            // Create set of new dates for fast lookup
716            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
717                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
718
719            // Keep existing records that aren't being replaced
720            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
721
722            // Merge
723            existing.append(&mut new_records);
724
725            // Sort by date for consistent ordering
726            existing.sort_by_key(|r| (r.profile_id, r.date));
727
728            // Write merged
729            let refs: Vec<&DailyHealth> = existing.iter().collect();
730            let batch = Self::daily_health_to_batch(refs)?;
731            self.write_batch(&path, &batch)?;
732        }
733
734        Ok(())
735    }
736
737    fn read_daily_health_from_path(&self, path: &Path) -> Result<Vec<DailyHealth>> {
738        let batches = self.read_batches(path)?;
739        let mut records = Vec::new();
740
741        for batch in batches {
742            records.extend(Self::batch_to_daily_health(&batch)?);
743        }
744
745        Ok(records)
746    }
747
748    fn daily_health_to_batch(records: Vec<&DailyHealth>) -> Result<RecordBatch> {
749        let id: Int64Array = records.iter().map(|r| r.id).collect();
750        let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
751        let date: Date32Array = records
752            .iter()
753            .map(|r| {
754                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
755                Some((r.date - epoch).num_days() as i32)
756            })
757            .collect();
758        let steps: Int32Array = records.iter().map(|r| r.steps).collect();
759        let step_goal: Int32Array = records.iter().map(|r| r.step_goal).collect();
760        let total_calories: Int32Array = records.iter().map(|r| r.total_calories).collect();
761        let active_calories: Int32Array = records.iter().map(|r| r.active_calories).collect();
762        let bmr_calories: Int32Array = records.iter().map(|r| r.bmr_calories).collect();
763        let resting_hr: Int32Array = records.iter().map(|r| r.resting_hr).collect();
764        let sleep_seconds: Int32Array = records.iter().map(|r| r.sleep_seconds).collect();
765        let deep_sleep_seconds: Int32Array = records.iter().map(|r| r.deep_sleep_seconds).collect();
766        let light_sleep_seconds: Int32Array =
767            records.iter().map(|r| r.light_sleep_seconds).collect();
768        let rem_sleep_seconds: Int32Array = records.iter().map(|r| r.rem_sleep_seconds).collect();
769        let sleep_score: Int32Array = records.iter().map(|r| r.sleep_score).collect();
770        let avg_stress: Int32Array = records.iter().map(|r| r.avg_stress).collect();
771        let max_stress: Int32Array = records.iter().map(|r| r.max_stress).collect();
772        let body_battery_start: Int32Array = records.iter().map(|r| r.body_battery_start).collect();
773        let body_battery_end: Int32Array = records.iter().map(|r| r.body_battery_end).collect();
774        let hrv_weekly_avg: Int32Array = records.iter().map(|r| r.hrv_weekly_avg).collect();
775        let hrv_last_night: Int32Array = records.iter().map(|r| r.hrv_last_night).collect();
776        let hrv_status: StringArray = records.iter().map(|r| r.hrv_status.as_deref()).collect();
777        let avg_respiration: Float64Array = records.iter().map(|r| r.avg_respiration).collect();
778        let avg_spo2: Int32Array = records.iter().map(|r| r.avg_spo2).collect();
779        let lowest_spo2: Int32Array = records.iter().map(|r| r.lowest_spo2).collect();
780        let hydration_ml: Int32Array = records.iter().map(|r| r.hydration_ml).collect();
781        let moderate_intensity_min: Int32Array =
782            records.iter().map(|r| r.moderate_intensity_min).collect();
783        let vigorous_intensity_min: Int32Array =
784            records.iter().map(|r| r.vigorous_intensity_min).collect();
785        let raw_json: StringArray = records
786            .iter()
787            .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
788            .collect();
789        let sleep_note: StringArray = records.iter().map(|r| r.sleep_note.as_deref()).collect();
790
791        let schema = Arc::new(Schema::new(vec![
792            Field::new("id", DataType::Int64, true),
793            Field::new("profile_id", DataType::Int32, false),
794            Field::new("date", DataType::Date32, false),
795            Field::new("steps", DataType::Int32, true),
796            Field::new("step_goal", DataType::Int32, true),
797            Field::new("total_calories", DataType::Int32, true),
798            Field::new("active_calories", DataType::Int32, true),
799            Field::new("bmr_calories", DataType::Int32, true),
800            Field::new("resting_hr", DataType::Int32, true),
801            Field::new("sleep_seconds", DataType::Int32, true),
802            Field::new("deep_sleep_seconds", DataType::Int32, true),
803            Field::new("light_sleep_seconds", DataType::Int32, true),
804            Field::new("rem_sleep_seconds", DataType::Int32, true),
805            Field::new("sleep_score", DataType::Int32, true),
806            Field::new("avg_stress", DataType::Int32, true),
807            Field::new("max_stress", DataType::Int32, true),
808            Field::new("body_battery_start", DataType::Int32, true),
809            Field::new("body_battery_end", DataType::Int32, true),
810            Field::new("hrv_weekly_avg", DataType::Int32, true),
811            Field::new("hrv_last_night", DataType::Int32, true),
812            Field::new("hrv_status", DataType::Utf8, true),
813            Field::new("avg_respiration", DataType::Float64, true),
814            Field::new("avg_spo2", DataType::Int32, true),
815            Field::new("lowest_spo2", DataType::Int32, true),
816            Field::new("hydration_ml", DataType::Int32, true),
817            Field::new("moderate_intensity_min", DataType::Int32, true),
818            Field::new("vigorous_intensity_min", DataType::Int32, true),
819            Field::new("raw_json", DataType::Utf8, true),
820            Field::new("sleep_note", DataType::Utf8, true),
821        ]));
822
823        RecordBatch::try_new(
824            schema,
825            vec![
826                Arc::new(id),
827                Arc::new(profile_id),
828                Arc::new(date),
829                Arc::new(steps),
830                Arc::new(step_goal),
831                Arc::new(total_calories),
832                Arc::new(active_calories),
833                Arc::new(bmr_calories),
834                Arc::new(resting_hr),
835                Arc::new(sleep_seconds),
836                Arc::new(deep_sleep_seconds),
837                Arc::new(light_sleep_seconds),
838                Arc::new(rem_sleep_seconds),
839                Arc::new(sleep_score),
840                Arc::new(avg_stress),
841                Arc::new(max_stress),
842                Arc::new(body_battery_start),
843                Arc::new(body_battery_end),
844                Arc::new(hrv_weekly_avg),
845                Arc::new(hrv_last_night),
846                Arc::new(hrv_status),
847                Arc::new(avg_respiration),
848                Arc::new(avg_spo2),
849                Arc::new(lowest_spo2),
850                Arc::new(hydration_ml),
851                Arc::new(moderate_intensity_min),
852                Arc::new(vigorous_intensity_min),
853                Arc::new(raw_json),
854                Arc::new(sleep_note),
855            ],
856        )
857        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
858    }
859
860    fn batch_to_daily_health(batch: &RecordBatch) -> Result<Vec<DailyHealth>> {
861        let len = batch.num_rows();
862        let mut records = Vec::with_capacity(len);
863
864        let id = batch
865            .column(0)
866            .as_any()
867            .downcast_ref::<Int64Array>()
868            .unwrap();
869        let profile_id = batch
870            .column(1)
871            .as_any()
872            .downcast_ref::<Int32Array>()
873            .unwrap();
874        let date = batch
875            .column(2)
876            .as_any()
877            .downcast_ref::<Date32Array>()
878            .unwrap();
879        let steps = batch
880            .column(3)
881            .as_any()
882            .downcast_ref::<Int32Array>()
883            .unwrap();
884        let step_goal = batch
885            .column(4)
886            .as_any()
887            .downcast_ref::<Int32Array>()
888            .unwrap();
889        let total_calories = batch
890            .column(5)
891            .as_any()
892            .downcast_ref::<Int32Array>()
893            .unwrap();
894        let active_calories = batch
895            .column(6)
896            .as_any()
897            .downcast_ref::<Int32Array>()
898            .unwrap();
899        let bmr_calories = batch
900            .column(7)
901            .as_any()
902            .downcast_ref::<Int32Array>()
903            .unwrap();
904        let resting_hr = batch
905            .column(8)
906            .as_any()
907            .downcast_ref::<Int32Array>()
908            .unwrap();
909        let sleep_seconds = batch
910            .column(9)
911            .as_any()
912            .downcast_ref::<Int32Array>()
913            .unwrap();
914        let deep_sleep_seconds = batch
915            .column(10)
916            .as_any()
917            .downcast_ref::<Int32Array>()
918            .unwrap();
919        let light_sleep_seconds = batch
920            .column(11)
921            .as_any()
922            .downcast_ref::<Int32Array>()
923            .unwrap();
924        let rem_sleep_seconds = batch
925            .column(12)
926            .as_any()
927            .downcast_ref::<Int32Array>()
928            .unwrap();
929        let sleep_score = batch
930            .column(13)
931            .as_any()
932            .downcast_ref::<Int32Array>()
933            .unwrap();
934        let avg_stress = batch
935            .column(14)
936            .as_any()
937            .downcast_ref::<Int32Array>()
938            .unwrap();
939        let max_stress = batch
940            .column(15)
941            .as_any()
942            .downcast_ref::<Int32Array>()
943            .unwrap();
944        let body_battery_start = batch
945            .column(16)
946            .as_any()
947            .downcast_ref::<Int32Array>()
948            .unwrap();
949        let body_battery_end = batch
950            .column(17)
951            .as_any()
952            .downcast_ref::<Int32Array>()
953            .unwrap();
954        let hrv_weekly_avg = batch
955            .column(18)
956            .as_any()
957            .downcast_ref::<Int32Array>()
958            .unwrap();
959        let hrv_last_night = batch
960            .column(19)
961            .as_any()
962            .downcast_ref::<Int32Array>()
963            .unwrap();
964        let hrv_status = batch
965            .column(20)
966            .as_any()
967            .downcast_ref::<StringArray>()
968            .unwrap();
969        let avg_respiration = batch
970            .column(21)
971            .as_any()
972            .downcast_ref::<Float64Array>()
973            .unwrap();
974        let avg_spo2 = batch
975            .column(22)
976            .as_any()
977            .downcast_ref::<Int32Array>()
978            .unwrap();
979        let lowest_spo2 = batch
980            .column(23)
981            .as_any()
982            .downcast_ref::<Int32Array>()
983            .unwrap();
984        let hydration_ml = batch
985            .column(24)
986            .as_any()
987            .downcast_ref::<Int32Array>()
988            .unwrap();
989        let moderate_intensity_min = batch
990            .column(25)
991            .as_any()
992            .downcast_ref::<Int32Array>()
993            .unwrap();
994        let vigorous_intensity_min = batch
995            .column(26)
996            .as_any()
997            .downcast_ref::<Int32Array>()
998            .unwrap();
999        let raw_json = batch
1000            .column(27)
1001            .as_any()
1002            .downcast_ref::<StringArray>()
1003            .unwrap();
1004        // Looked up by name for backward compatibility: Parquet files written
1005        // before this column existed simply yield None.
1006        let sleep_note = batch
1007            .column_by_name("sleep_note")
1008            .and_then(|c| c.as_any().downcast_ref::<StringArray>());
1009
1010        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1011
1012        for i in 0..len {
1013            records.push(DailyHealth {
1014                id: id.is_valid(i).then(|| id.value(i)),
1015                profile_id: profile_id.value(i),
1016                date: epoch + chrono::Duration::days(date.value(i) as i64),
1017                steps: steps.is_valid(i).then(|| steps.value(i)),
1018                step_goal: step_goal.is_valid(i).then(|| step_goal.value(i)),
1019                total_calories: total_calories.is_valid(i).then(|| total_calories.value(i)),
1020                active_calories: active_calories
1021                    .is_valid(i)
1022                    .then(|| active_calories.value(i)),
1023                bmr_calories: bmr_calories.is_valid(i).then(|| bmr_calories.value(i)),
1024                resting_hr: resting_hr.is_valid(i).then(|| resting_hr.value(i)),
1025                sleep_seconds: sleep_seconds.is_valid(i).then(|| sleep_seconds.value(i)),
1026                deep_sleep_seconds: deep_sleep_seconds
1027                    .is_valid(i)
1028                    .then(|| deep_sleep_seconds.value(i)),
1029                light_sleep_seconds: light_sleep_seconds
1030                    .is_valid(i)
1031                    .then(|| light_sleep_seconds.value(i)),
1032                rem_sleep_seconds: rem_sleep_seconds
1033                    .is_valid(i)
1034                    .then(|| rem_sleep_seconds.value(i)),
1035                sleep_score: sleep_score.is_valid(i).then(|| sleep_score.value(i)),
1036                sleep_note: sleep_note
1037                    .and_then(|arr| arr.is_valid(i).then(|| arr.value(i).to_string())),
1038                avg_stress: avg_stress.is_valid(i).then(|| avg_stress.value(i)),
1039                max_stress: max_stress.is_valid(i).then(|| max_stress.value(i)),
1040                body_battery_start: body_battery_start
1041                    .is_valid(i)
1042                    .then(|| body_battery_start.value(i)),
1043                body_battery_end: body_battery_end
1044                    .is_valid(i)
1045                    .then(|| body_battery_end.value(i)),
1046                hrv_weekly_avg: hrv_weekly_avg.is_valid(i).then(|| hrv_weekly_avg.value(i)),
1047                hrv_last_night: hrv_last_night.is_valid(i).then(|| hrv_last_night.value(i)),
1048                hrv_status: hrv_status
1049                    .is_valid(i)
1050                    .then(|| hrv_status.value(i).to_string()),
1051                avg_respiration: avg_respiration
1052                    .is_valid(i)
1053                    .then(|| avg_respiration.value(i)),
1054                avg_spo2: avg_spo2.is_valid(i).then(|| avg_spo2.value(i)),
1055                lowest_spo2: lowest_spo2.is_valid(i).then(|| lowest_spo2.value(i)),
1056                hydration_ml: hydration_ml.is_valid(i).then(|| hydration_ml.value(i)),
1057                moderate_intensity_min: moderate_intensity_min
1058                    .is_valid(i)
1059                    .then(|| moderate_intensity_min.value(i)),
1060                vigorous_intensity_min: vigorous_intensity_min
1061                    .is_valid(i)
1062                    .then(|| vigorous_intensity_min.value(i)),
1063                raw_json: raw_json
1064                    .is_valid(i)
1065                    .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1066            });
1067        }
1068
1069        Ok(records)
1070    }
1071
1072    // =========================================================================
1073    // Track Points
1074    // =========================================================================
1075
1076    /// Write track points for an activity
1077    pub fn write_track_points(
1078        &self,
1079        activity_date: NaiveDate,
1080        points: &[TrackPoint],
1081    ) -> Result<()> {
1082        self.ensure_dir(EntityType::TrackPoints)?;
1083
1084        let key = EntityType::TrackPoints.partition_key(activity_date);
1085        let path = self.partition_path(EntityType::TrackPoints, &key);
1086
1087        // Read existing, filter out points for the same activity, add new
1088        let mut existing = self.read_track_points_from_path(&path)?;
1089
1090        if let Some(first) = points.first() {
1091            existing.retain(|p| p.activity_id != first.activity_id);
1092        }
1093
1094        existing.extend(points.iter().cloned());
1095
1096        // Sort by activity_id, then timestamp
1097        existing.sort_by(|a, b| {
1098            a.activity_id
1099                .cmp(&b.activity_id)
1100                .then(a.timestamp.cmp(&b.timestamp))
1101        });
1102
1103        let refs: Vec<&TrackPoint> = existing.iter().collect();
1104        let batch = Self::track_points_to_batch(refs)?;
1105        self.write_batch(&path, &batch)?;
1106
1107        Ok(())
1108    }
1109
1110    /// Write track points with partition-level locking for concurrent writes
1111    pub async fn write_track_points_async(
1112        &self,
1113        activity_date: NaiveDate,
1114        points: &[TrackPoint],
1115    ) -> Result<()> {
1116        self.ensure_dir(EntityType::TrackPoints)?;
1117
1118        let key = EntityType::TrackPoints.partition_key(activity_date);
1119
1120        // Acquire partition lock
1121        let lock = self.get_partition_lock(&key);
1122        let _guard = lock.lock().await;
1123
1124        let path = self.partition_path(EntityType::TrackPoints, &key);
1125
1126        // Read existing, filter out points for the same activity, add new
1127        let mut existing = self.read_track_points_from_path(&path)?;
1128
1129        if let Some(first) = points.first() {
1130            existing.retain(|p| p.activity_id != first.activity_id);
1131        }
1132
1133        existing.extend(points.iter().cloned());
1134
1135        // Sort by activity_id, then timestamp
1136        existing.sort_by(|a, b| {
1137            a.activity_id
1138                .cmp(&b.activity_id)
1139                .then(a.timestamp.cmp(&b.timestamp))
1140        });
1141
1142        let refs: Vec<&TrackPoint> = existing.iter().collect();
1143        let batch = Self::track_points_to_batch(refs)?;
1144        self.write_batch(&path, &batch)?;
1145
1146        Ok(())
1147    }
1148
1149    fn read_track_points_from_path(&self, path: &Path) -> Result<Vec<TrackPoint>> {
1150        let batches = self.read_batches(path)?;
1151        let mut points = Vec::new();
1152
1153        for batch in batches {
1154            points.extend(Self::batch_to_track_points(&batch)?);
1155        }
1156
1157        Ok(points)
1158    }
1159
1160    fn track_points_to_batch(points: Vec<&TrackPoint>) -> Result<RecordBatch> {
1161        let id: Int64Array = points.iter().map(|p| p.id).collect();
1162        let activity_id: Int64Array = points.iter().map(|p| p.activity_id).collect();
1163        let timestamp: TimestampMicrosecondArray = points
1164            .iter()
1165            .map(|p| Some(p.timestamp.timestamp_micros()))
1166            .collect();
1167        let lat: Float64Array = points.iter().map(|p| p.lat).collect();
1168        let lon: Float64Array = points.iter().map(|p| p.lon).collect();
1169        let elevation: Float64Array = points.iter().map(|p| p.elevation).collect();
1170        let heart_rate: Int32Array = points.iter().map(|p| p.heart_rate).collect();
1171        let cadence: Int32Array = points.iter().map(|p| p.cadence).collect();
1172        let power: Int32Array = points.iter().map(|p| p.power).collect();
1173        let speed: Float64Array = points.iter().map(|p| p.speed).collect();
1174
1175        let schema = Arc::new(Schema::new(vec![
1176            Field::new("id", DataType::Int64, true),
1177            Field::new("activity_id", DataType::Int64, false),
1178            Field::new(
1179                "timestamp",
1180                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1181                false,
1182            ),
1183            Field::new("lat", DataType::Float64, true),
1184            Field::new("lon", DataType::Float64, true),
1185            Field::new("elevation", DataType::Float64, true),
1186            Field::new("heart_rate", DataType::Int32, true),
1187            Field::new("cadence", DataType::Int32, true),
1188            Field::new("power", DataType::Int32, true),
1189            Field::new("speed", DataType::Float64, true),
1190        ]));
1191
1192        RecordBatch::try_new(
1193            schema,
1194            vec![
1195                Arc::new(id),
1196                Arc::new(activity_id),
1197                Arc::new(timestamp),
1198                Arc::new(lat),
1199                Arc::new(lon),
1200                Arc::new(elevation),
1201                Arc::new(heart_rate),
1202                Arc::new(cadence),
1203                Arc::new(power),
1204                Arc::new(speed),
1205            ],
1206        )
1207        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1208    }
1209
1210    fn batch_to_track_points(batch: &RecordBatch) -> Result<Vec<TrackPoint>> {
1211        let len = batch.num_rows();
1212        let mut points = Vec::with_capacity(len);
1213
1214        let id = batch
1215            .column(0)
1216            .as_any()
1217            .downcast_ref::<Int64Array>()
1218            .unwrap();
1219        let activity_id = batch
1220            .column(1)
1221            .as_any()
1222            .downcast_ref::<Int64Array>()
1223            .unwrap();
1224        let timestamp = batch
1225            .column(2)
1226            .as_any()
1227            .downcast_ref::<TimestampMicrosecondArray>()
1228            .unwrap();
1229        let lat = batch
1230            .column(3)
1231            .as_any()
1232            .downcast_ref::<Float64Array>()
1233            .unwrap();
1234        let lon = batch
1235            .column(4)
1236            .as_any()
1237            .downcast_ref::<Float64Array>()
1238            .unwrap();
1239        let elevation = batch
1240            .column(5)
1241            .as_any()
1242            .downcast_ref::<Float64Array>()
1243            .unwrap();
1244        let heart_rate = batch
1245            .column(6)
1246            .as_any()
1247            .downcast_ref::<Int32Array>()
1248            .unwrap();
1249        let cadence = batch
1250            .column(7)
1251            .as_any()
1252            .downcast_ref::<Int32Array>()
1253            .unwrap();
1254        let power = batch
1255            .column(8)
1256            .as_any()
1257            .downcast_ref::<Int32Array>()
1258            .unwrap();
1259        let speed = batch
1260            .column(9)
1261            .as_any()
1262            .downcast_ref::<Float64Array>()
1263            .unwrap();
1264
1265        for i in 0..len {
1266            points.push(TrackPoint {
1267                id: id.is_valid(i).then(|| id.value(i)),
1268                activity_id: activity_id.value(i),
1269                timestamp: DateTime::from_timestamp_micros(timestamp.value(i)).unwrap_or_default(),
1270                lat: lat.is_valid(i).then(|| lat.value(i)),
1271                lon: lon.is_valid(i).then(|| lon.value(i)),
1272                elevation: elevation.is_valid(i).then(|| elevation.value(i)),
1273                heart_rate: heart_rate.is_valid(i).then(|| heart_rate.value(i)),
1274                cadence: cadence.is_valid(i).then(|| cadence.value(i)),
1275                power: power.is_valid(i).then(|| power.value(i)),
1276                speed: speed.is_valid(i).then(|| speed.value(i)),
1277            });
1278        }
1279
1280        Ok(points)
1281    }
1282
1283    // =========================================================================
1284    // Performance Metrics
1285    // =========================================================================
1286
1287    /// Upsert performance metrics records
1288    pub fn upsert_performance_metrics(&self, records: &[PerformanceMetrics]) -> Result<()> {
1289        self.ensure_dir(EntityType::PerformanceMetrics)?;
1290
1291        // Group by partition key
1292        let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1293            std::collections::HashMap::new();
1294
1295        for record in records {
1296            let key = EntityType::PerformanceMetrics.partition_key(record.date);
1297            partitions.entry(key).or_default().push(record.clone());
1298        }
1299
1300        // For each partition, read existing and merge
1301        for (key, mut new_records) in partitions {
1302            let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1303
1304            // Read existing
1305            let mut existing = self.read_performance_metrics_from_path(&path)?;
1306
1307            // Create set of new dates for fast lookup
1308            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1309                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1310
1311            // Keep existing records that aren't being replaced
1312            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1313
1314            // Merge
1315            existing.append(&mut new_records);
1316
1317            // Sort by date for consistent ordering
1318            existing.sort_by_key(|r| (r.profile_id, r.date));
1319
1320            // Write merged
1321            let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1322            let batch = Self::performance_metrics_to_batch(refs)?;
1323            self.write_batch(&path, &batch)?;
1324        }
1325
1326        Ok(())
1327    }
1328
1329    /// Upsert performance metrics records with partition-level locking for concurrent writes
1330    pub async fn upsert_performance_metrics_async(
1331        &self,
1332        records: &[PerformanceMetrics],
1333    ) -> Result<()> {
1334        self.ensure_dir(EntityType::PerformanceMetrics)?;
1335
1336        // Group by partition key
1337        let mut partitions: std::collections::HashMap<String, Vec<PerformanceMetrics>> =
1338            std::collections::HashMap::new();
1339
1340        for record in records {
1341            let key = EntityType::PerformanceMetrics.partition_key(record.date);
1342            partitions.entry(key).or_default().push(record.clone());
1343        }
1344
1345        // For each partition, acquire lock then read/merge/write
1346        for (key, mut new_records) in partitions {
1347            // Acquire partition lock
1348            let lock = self.get_partition_lock(&key);
1349            let _guard = lock.lock().await;
1350
1351            let path = self.partition_path(EntityType::PerformanceMetrics, &key);
1352
1353            // Read existing
1354            let mut existing = self.read_performance_metrics_from_path(&path)?;
1355
1356            // Create set of new dates for fast lookup
1357            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1358                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1359
1360            // Keep existing records that aren't being replaced
1361            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1362
1363            // Merge
1364            existing.append(&mut new_records);
1365
1366            // Sort by date for consistent ordering
1367            existing.sort_by_key(|r| (r.profile_id, r.date));
1368
1369            // Write merged
1370            let refs: Vec<&PerformanceMetrics> = existing.iter().collect();
1371            let batch = Self::performance_metrics_to_batch(refs)?;
1372            self.write_batch(&path, &batch)?;
1373        }
1374
1375        Ok(())
1376    }
1377
1378    fn read_performance_metrics_from_path(&self, path: &Path) -> Result<Vec<PerformanceMetrics>> {
1379        let batches = self.read_batches(path)?;
1380        let mut records = Vec::new();
1381
1382        for batch in batches {
1383            records.extend(Self::batch_to_performance_metrics(&batch)?);
1384        }
1385
1386        Ok(records)
1387    }
1388
1389    fn performance_metrics_to_batch(records: Vec<&PerformanceMetrics>) -> Result<RecordBatch> {
1390        let id: Int64Array = records.iter().map(|r| r.id).collect();
1391        let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1392        let date: Date32Array = records
1393            .iter()
1394            .map(|r| {
1395                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1396                Some((r.date - epoch).num_days() as i32)
1397            })
1398            .collect();
1399        let vo2max: Float64Array = records.iter().map(|r| r.vo2max).collect();
1400        let fitness_age: Int32Array = records.iter().map(|r| r.fitness_age).collect();
1401        let training_readiness: Int32Array = records.iter().map(|r| r.training_readiness).collect();
1402        let training_status: StringArray = records
1403            .iter()
1404            .map(|r| r.training_status.as_deref())
1405            .collect();
1406        let lactate_threshold_hr: Int32Array =
1407            records.iter().map(|r| r.lactate_threshold_hr).collect();
1408        let lactate_threshold_pace: Float64Array =
1409            records.iter().map(|r| r.lactate_threshold_pace).collect();
1410        let race_5k_sec: Int32Array = records.iter().map(|r| r.race_5k_sec).collect();
1411        let race_10k_sec: Int32Array = records.iter().map(|r| r.race_10k_sec).collect();
1412        let race_half_sec: Int32Array = records.iter().map(|r| r.race_half_sec).collect();
1413        let race_marathon_sec: Int32Array = records.iter().map(|r| r.race_marathon_sec).collect();
1414        let endurance_score: Int32Array = records.iter().map(|r| r.endurance_score).collect();
1415        let hill_score: Int32Array = records.iter().map(|r| r.hill_score).collect();
1416        let raw_json: StringArray = records
1417            .iter()
1418            .map(|r| r.raw_json.as_ref().map(|j| j.to_string()))
1419            .collect();
1420
1421        let schema = Arc::new(Schema::new(vec![
1422            Field::new("id", DataType::Int64, true),
1423            Field::new("profile_id", DataType::Int32, false),
1424            Field::new("date", DataType::Date32, false),
1425            Field::new("vo2max", DataType::Float64, true),
1426            Field::new("fitness_age", DataType::Int32, true),
1427            Field::new("training_readiness", DataType::Int32, true),
1428            Field::new("training_status", DataType::Utf8, true),
1429            Field::new("lactate_threshold_hr", DataType::Int32, true),
1430            Field::new("lactate_threshold_pace", DataType::Float64, true),
1431            Field::new("race_5k_sec", DataType::Int32, true),
1432            Field::new("race_10k_sec", DataType::Int32, true),
1433            Field::new("race_half_sec", DataType::Int32, true),
1434            Field::new("race_marathon_sec", DataType::Int32, true),
1435            Field::new("endurance_score", DataType::Int32, true),
1436            Field::new("hill_score", DataType::Int32, true),
1437            Field::new("raw_json", DataType::Utf8, true),
1438        ]));
1439
1440        RecordBatch::try_new(
1441            schema,
1442            vec![
1443                Arc::new(id),
1444                Arc::new(profile_id),
1445                Arc::new(date),
1446                Arc::new(vo2max),
1447                Arc::new(fitness_age),
1448                Arc::new(training_readiness),
1449                Arc::new(training_status),
1450                Arc::new(lactate_threshold_hr),
1451                Arc::new(lactate_threshold_pace),
1452                Arc::new(race_5k_sec),
1453                Arc::new(race_10k_sec),
1454                Arc::new(race_half_sec),
1455                Arc::new(race_marathon_sec),
1456                Arc::new(endurance_score),
1457                Arc::new(hill_score),
1458                Arc::new(raw_json),
1459            ],
1460        )
1461        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1462    }
1463
1464    fn batch_to_performance_metrics(batch: &RecordBatch) -> Result<Vec<PerformanceMetrics>> {
1465        let len = batch.num_rows();
1466        let mut records = Vec::with_capacity(len);
1467
1468        let id = batch
1469            .column(0)
1470            .as_any()
1471            .downcast_ref::<Int64Array>()
1472            .unwrap();
1473        let profile_id = batch
1474            .column(1)
1475            .as_any()
1476            .downcast_ref::<Int32Array>()
1477            .unwrap();
1478        let date = batch
1479            .column(2)
1480            .as_any()
1481            .downcast_ref::<Date32Array>()
1482            .unwrap();
1483        let vo2max = batch
1484            .column(3)
1485            .as_any()
1486            .downcast_ref::<Float64Array>()
1487            .unwrap();
1488        let fitness_age = batch
1489            .column(4)
1490            .as_any()
1491            .downcast_ref::<Int32Array>()
1492            .unwrap();
1493        let training_readiness = batch
1494            .column(5)
1495            .as_any()
1496            .downcast_ref::<Int32Array>()
1497            .unwrap();
1498        let training_status = batch
1499            .column(6)
1500            .as_any()
1501            .downcast_ref::<StringArray>()
1502            .unwrap();
1503        let lactate_threshold_hr = batch
1504            .column(7)
1505            .as_any()
1506            .downcast_ref::<Int32Array>()
1507            .unwrap();
1508        let lactate_threshold_pace = batch
1509            .column(8)
1510            .as_any()
1511            .downcast_ref::<Float64Array>()
1512            .unwrap();
1513        let race_5k_sec = batch
1514            .column(9)
1515            .as_any()
1516            .downcast_ref::<Int32Array>()
1517            .unwrap();
1518        let race_10k_sec = batch
1519            .column(10)
1520            .as_any()
1521            .downcast_ref::<Int32Array>()
1522            .unwrap();
1523        let race_half_sec = batch
1524            .column(11)
1525            .as_any()
1526            .downcast_ref::<Int32Array>()
1527            .unwrap();
1528        let race_marathon_sec = batch
1529            .column(12)
1530            .as_any()
1531            .downcast_ref::<Int32Array>()
1532            .unwrap();
1533        let endurance_score = batch
1534            .column(13)
1535            .as_any()
1536            .downcast_ref::<Int32Array>()
1537            .unwrap();
1538        let hill_score = batch
1539            .column(14)
1540            .as_any()
1541            .downcast_ref::<Int32Array>()
1542            .unwrap();
1543        let raw_json = batch
1544            .column(15)
1545            .as_any()
1546            .downcast_ref::<StringArray>()
1547            .unwrap();
1548
1549        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1550
1551        for i in 0..len {
1552            records.push(PerformanceMetrics {
1553                id: id.is_valid(i).then(|| id.value(i)),
1554                profile_id: profile_id.value(i),
1555                date: epoch + chrono::Duration::days(date.value(i) as i64),
1556                vo2max: vo2max.is_valid(i).then(|| vo2max.value(i)),
1557                fitness_age: fitness_age.is_valid(i).then(|| fitness_age.value(i)),
1558                training_readiness: training_readiness
1559                    .is_valid(i)
1560                    .then(|| training_readiness.value(i)),
1561                training_status: training_status
1562                    .is_valid(i)
1563                    .then(|| training_status.value(i).to_string()),
1564                lactate_threshold_hr: lactate_threshold_hr
1565                    .is_valid(i)
1566                    .then(|| lactate_threshold_hr.value(i)),
1567                lactate_threshold_pace: lactate_threshold_pace
1568                    .is_valid(i)
1569                    .then(|| lactate_threshold_pace.value(i)),
1570                race_5k_sec: race_5k_sec.is_valid(i).then(|| race_5k_sec.value(i)),
1571                race_10k_sec: race_10k_sec.is_valid(i).then(|| race_10k_sec.value(i)),
1572                race_half_sec: race_half_sec.is_valid(i).then(|| race_half_sec.value(i)),
1573                race_marathon_sec: race_marathon_sec
1574                    .is_valid(i)
1575                    .then(|| race_marathon_sec.value(i)),
1576                endurance_score: endurance_score
1577                    .is_valid(i)
1578                    .then(|| endurance_score.value(i)),
1579                hill_score: hill_score.is_valid(i).then(|| hill_score.value(i)),
1580                raw_json: raw_json
1581                    .is_valid(i)
1582                    .then(|| serde_json::from_str(raw_json.value(i)).unwrap_or_default()),
1583            });
1584        }
1585
1586        Ok(records)
1587    }
1588
1589    // =========================================================================
1590    // Weight
1591    // =========================================================================
1592
1593    /// Upsert weight entries
1594    pub fn upsert_weight(&self, records: &[WeightEntry]) -> Result<()> {
1595        self.ensure_dir(EntityType::Weight)?;
1596
1597        // Group by partition key
1598        let mut partitions: std::collections::HashMap<String, Vec<WeightEntry>> =
1599            std::collections::HashMap::new();
1600
1601        for record in records {
1602            let key = EntityType::Weight.partition_key(record.date);
1603            partitions.entry(key).or_default().push(record.clone());
1604        }
1605
1606        // For each partition, read existing and merge
1607        for (key, mut new_records) in partitions {
1608            let path = self.partition_path(EntityType::Weight, &key);
1609
1610            // Read existing
1611            let mut existing = self.read_weight_from_path(&path)?;
1612
1613            // Create set of new dates for fast lookup
1614            let new_dates: std::collections::HashSet<(i32, NaiveDate)> =
1615                new_records.iter().map(|r| (r.profile_id, r.date)).collect();
1616
1617            // Keep existing records that aren't being replaced
1618            existing.retain(|r| !new_dates.contains(&(r.profile_id, r.date)));
1619
1620            // Merge
1621            existing.append(&mut new_records);
1622
1623            // Sort by date for consistent ordering
1624            existing.sort_by_key(|r| (r.profile_id, r.date));
1625
1626            // Write merged
1627            let refs: Vec<&WeightEntry> = existing.iter().collect();
1628            let batch = Self::weight_to_batch(refs)?;
1629            self.write_batch(&path, &batch)?;
1630        }
1631
1632        Ok(())
1633    }
1634
1635    fn read_weight_from_path(&self, path: &Path) -> Result<Vec<WeightEntry>> {
1636        let batches = self.read_batches(path)?;
1637        let mut records = Vec::new();
1638
1639        for batch in batches {
1640            records.extend(Self::batch_to_weight(&batch)?);
1641        }
1642
1643        Ok(records)
1644    }
1645
1646    fn weight_to_batch(records: Vec<&WeightEntry>) -> Result<RecordBatch> {
1647        let id: Int64Array = records.iter().map(|r| r.id).collect();
1648        let profile_id: Int32Array = records.iter().map(|r| r.profile_id).collect();
1649        let date: Date32Array = records
1650            .iter()
1651            .map(|r| {
1652                let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1653                Some((r.date - epoch).num_days() as i32)
1654            })
1655            .collect();
1656        let weight_kg: Float64Array = records.iter().map(|r| r.weight_kg).collect();
1657        let bmi: Float64Array = records.iter().map(|r| r.bmi).collect();
1658        let body_fat_pct: Float64Array = records.iter().map(|r| r.body_fat_pct).collect();
1659        let muscle_mass_kg: Float64Array = records.iter().map(|r| r.muscle_mass_kg).collect();
1660
1661        let schema = Arc::new(Schema::new(vec![
1662            Field::new("id", DataType::Int64, true),
1663            Field::new("profile_id", DataType::Int32, false),
1664            Field::new("date", DataType::Date32, false),
1665            Field::new("weight_kg", DataType::Float64, true),
1666            Field::new("bmi", DataType::Float64, true),
1667            Field::new("body_fat_pct", DataType::Float64, true),
1668            Field::new("muscle_mass_kg", DataType::Float64, true),
1669        ]));
1670
1671        RecordBatch::try_new(
1672            schema,
1673            vec![
1674                Arc::new(id),
1675                Arc::new(profile_id),
1676                Arc::new(date),
1677                Arc::new(weight_kg),
1678                Arc::new(bmi),
1679                Arc::new(body_fat_pct),
1680                Arc::new(muscle_mass_kg),
1681            ],
1682        )
1683        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1684    }
1685
1686    fn batch_to_weight(batch: &RecordBatch) -> Result<Vec<WeightEntry>> {
1687        let len = batch.num_rows();
1688        let mut records = Vec::with_capacity(len);
1689
1690        let id = batch
1691            .column(0)
1692            .as_any()
1693            .downcast_ref::<Int64Array>()
1694            .unwrap();
1695        let profile_id = batch
1696            .column(1)
1697            .as_any()
1698            .downcast_ref::<Int32Array>()
1699            .unwrap();
1700        let date = batch
1701            .column(2)
1702            .as_any()
1703            .downcast_ref::<Date32Array>()
1704            .unwrap();
1705        let weight_kg = batch
1706            .column(3)
1707            .as_any()
1708            .downcast_ref::<Float64Array>()
1709            .unwrap();
1710        let bmi = batch
1711            .column(4)
1712            .as_any()
1713            .downcast_ref::<Float64Array>()
1714            .unwrap();
1715        let body_fat_pct = batch
1716            .column(5)
1717            .as_any()
1718            .downcast_ref::<Float64Array>()
1719            .unwrap();
1720        let muscle_mass_kg = batch
1721            .column(6)
1722            .as_any()
1723            .downcast_ref::<Float64Array>()
1724            .unwrap();
1725
1726        let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1727
1728        for i in 0..len {
1729            records.push(WeightEntry {
1730                id: id.is_valid(i).then(|| id.value(i)),
1731                profile_id: profile_id.value(i),
1732                date: epoch + chrono::Duration::days(date.value(i) as i64),
1733                weight_kg: weight_kg.is_valid(i).then(|| weight_kg.value(i)),
1734                bmi: bmi.is_valid(i).then(|| bmi.value(i)),
1735                body_fat_pct: body_fat_pct.is_valid(i).then(|| body_fat_pct.value(i)),
1736                muscle_mass_kg: muscle_mass_kg.is_valid(i).then(|| muscle_mass_kg.value(i)),
1737            });
1738        }
1739
1740        Ok(records)
1741    }
1742
1743    // =========================================================================
1744    // Profiles
1745    // =========================================================================
1746
1747    /// Write profiles (overwrites entire file)
1748    pub fn write_profiles(&self, profiles: &[Profile]) -> Result<()> {
1749        fs::create_dir_all(&self.base_path)
1750            .map_err(|e| GarminError::Database(format!("Failed to create directory: {}", e)))?;
1751
1752        let path = self.partition_path(EntityType::Profiles, "");
1753        let refs: Vec<&Profile> = profiles.iter().collect();
1754        let batch = Self::profiles_to_batch(refs)?;
1755        self.write_batch(&path, &batch)?;
1756
1757        Ok(())
1758    }
1759
1760    /// Read all profiles
1761    pub fn read_profiles(&self) -> Result<Vec<Profile>> {
1762        let path = self.partition_path(EntityType::Profiles, "");
1763        let batches = self.read_batches(&path)?;
1764        let mut profiles = Vec::new();
1765
1766        for batch in batches {
1767            profiles.extend(Self::batch_to_profiles(&batch)?);
1768        }
1769
1770        Ok(profiles)
1771    }
1772
1773    fn profiles_to_batch(profiles: Vec<&Profile>) -> Result<RecordBatch> {
1774        let profile_id: Int32Array = profiles.iter().map(|p| p.profile_id).collect();
1775        let display_name: StringArray = profiles
1776            .iter()
1777            .map(|p| Some(p.display_name.as_str()))
1778            .collect();
1779        let user_id: Int64Array = profiles.iter().map(|p| p.user_id).collect();
1780        let created_at: TimestampMicrosecondArray = profiles
1781            .iter()
1782            .map(|p| p.created_at.map(|t| t.timestamp_micros()))
1783            .collect();
1784        let last_sync_at: TimestampMicrosecondArray = profiles
1785            .iter()
1786            .map(|p| p.last_sync_at.map(|t| t.timestamp_micros()))
1787            .collect();
1788
1789        let schema = Arc::new(Schema::new(vec![
1790            Field::new("profile_id", DataType::Int32, false),
1791            Field::new("display_name", DataType::Utf8, false),
1792            Field::new("user_id", DataType::Int64, true),
1793            Field::new(
1794                "created_at",
1795                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1796                true,
1797            ),
1798            Field::new(
1799                "last_sync_at",
1800                DataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, None),
1801                true,
1802            ),
1803        ]));
1804
1805        RecordBatch::try_new(
1806            schema,
1807            vec![
1808                Arc::new(profile_id),
1809                Arc::new(display_name),
1810                Arc::new(user_id),
1811                Arc::new(created_at),
1812                Arc::new(last_sync_at),
1813            ],
1814        )
1815        .map_err(|e| GarminError::Database(format!("Failed to create record batch: {}", e)))
1816    }
1817
1818    fn batch_to_profiles(batch: &RecordBatch) -> Result<Vec<Profile>> {
1819        let len = batch.num_rows();
1820        let mut profiles = Vec::with_capacity(len);
1821
1822        let profile_id = batch
1823            .column(0)
1824            .as_any()
1825            .downcast_ref::<Int32Array>()
1826            .unwrap();
1827        let display_name = batch
1828            .column(1)
1829            .as_any()
1830            .downcast_ref::<StringArray>()
1831            .unwrap();
1832        let user_id = batch
1833            .column(2)
1834            .as_any()
1835            .downcast_ref::<Int64Array>()
1836            .unwrap();
1837        let created_at = batch
1838            .column(3)
1839            .as_any()
1840            .downcast_ref::<TimestampMicrosecondArray>()
1841            .unwrap();
1842        let last_sync_at = batch
1843            .column(4)
1844            .as_any()
1845            .downcast_ref::<TimestampMicrosecondArray>()
1846            .unwrap();
1847
1848        for i in 0..len {
1849            profiles.push(Profile {
1850                profile_id: profile_id.value(i),
1851                display_name: display_name.value(i).to_string(),
1852                user_id: user_id.is_valid(i).then(|| user_id.value(i)),
1853                created_at: created_at.is_valid(i).then(|| {
1854                    DateTime::from_timestamp_micros(created_at.value(i)).unwrap_or_default()
1855                }),
1856                last_sync_at: last_sync_at.is_valid(i).then(|| {
1857                    DateTime::from_timestamp_micros(last_sync_at.value(i)).unwrap_or_default()
1858                }),
1859            });
1860        }
1861
1862        Ok(profiles)
1863    }
1864
1865    /// Get the base path for external readers to use with DuckDB glob queries
1866    pub fn base_path(&self) -> &Path {
1867        &self.base_path
1868    }
1869}
1870
1871#[cfg(test)]
1872mod tests {
1873    use super::*;
1874    use tempfile::TempDir;
1875
1876    #[test]
1877    fn test_activity_round_trip() {
1878        let temp = TempDir::new().unwrap();
1879        let store = ParquetStore::new(temp.path());
1880
1881        let activities = vec![Activity {
1882            activity_id: 123,
1883            profile_id: 1,
1884            activity_name: Some("Morning Run".to_string()),
1885            activity_type: Some("running".to_string()),
1886            start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1887            start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
1888            duration_sec: Some(3600.0),
1889            distance_m: Some(10000.0),
1890            calories: Some(500),
1891            avg_hr: Some(150),
1892            max_hr: Some(175),
1893            avg_speed: Some(2.78),
1894            max_speed: Some(3.5),
1895            elevation_gain: Some(100.0),
1896            elevation_loss: Some(100.0),
1897            avg_cadence: Some(180.0),
1898            avg_power: None,
1899            normalized_power: None,
1900            training_effect: Some(3.5),
1901            training_load: Some(120.0),
1902            start_lat: Some(37.7749),
1903            start_lon: Some(-122.4194),
1904            end_lat: Some(37.7849),
1905            end_lon: Some(-122.4094),
1906            ground_contact_time: Some(250.0),
1907            vertical_oscillation: Some(8.5),
1908            stride_length: Some(1.2),
1909            location_name: Some("San Francisco".to_string()),
1910            raw_json: None,
1911        }];
1912
1913        store.upsert_activities(&activities).unwrap();
1914
1915        // Read back
1916        let key = EntityType::Activities
1917            .partition_key(activities[0].start_time_local.unwrap().date_naive());
1918        let path = store.partition_path(EntityType::Activities, &key);
1919        let read_back = store.read_activities_from_path(&path).unwrap();
1920
1921        assert_eq!(read_back.len(), 1);
1922        assert_eq!(read_back[0].activity_id, 123);
1923        assert_eq!(read_back[0].activity_name, Some("Morning Run".to_string()));
1924    }
1925
1926    #[test]
1927    fn test_daily_health_round_trip() {
1928        let temp = TempDir::new().unwrap();
1929        let store = ParquetStore::new(temp.path());
1930
1931        let records = vec![DailyHealth {
1932            id: None,
1933            profile_id: 1,
1934            date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
1935            steps: Some(10000),
1936            step_goal: Some(8000),
1937            total_calories: Some(2500),
1938            active_calories: Some(500),
1939            bmr_calories: Some(2000),
1940            resting_hr: Some(55),
1941            sleep_seconds: Some(28800),
1942            deep_sleep_seconds: Some(7200),
1943            light_sleep_seconds: Some(14400),
1944            rem_sleep_seconds: Some(7200),
1945            sleep_score: Some(85),
1946            sleep_note: Some("Felt rested.".to_string()),
1947            avg_stress: Some(30),
1948            max_stress: Some(75),
1949            body_battery_start: Some(95),
1950            body_battery_end: Some(45),
1951            hrv_weekly_avg: Some(45),
1952            hrv_last_night: Some(48),
1953            hrv_status: Some("balanced".to_string()),
1954            avg_respiration: Some(14.5),
1955            avg_spo2: Some(96),
1956            lowest_spo2: Some(92),
1957            hydration_ml: Some(2500),
1958            moderate_intensity_min: Some(30),
1959            vigorous_intensity_min: Some(20),
1960            raw_json: None,
1961        }];
1962
1963        store.upsert_daily_health(&records).unwrap();
1964
1965        // Read back
1966        let key = EntityType::DailyHealth.partition_key(records[0].date);
1967        let path = store.partition_path(EntityType::DailyHealth, &key);
1968        let read_back = store.read_daily_health_from_path(&path).unwrap();
1969
1970        assert_eq!(read_back.len(), 1);
1971        assert_eq!(read_back[0].steps, Some(10000));
1972        assert_eq!(read_back[0].sleep_note.as_deref(), Some("Felt rested."));
1973        assert_eq!(
1974            read_back[0].date,
1975            NaiveDate::from_ymd_opt(2024, 12, 15).unwrap()
1976        );
1977    }
1978
1979    #[test]
1980    fn test_has_daily_health() {
1981        let temp = TempDir::new().unwrap();
1982        let store = ParquetStore::new(temp.path());
1983        let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
1984
1985        assert!(!store.has_daily_health(1, date).unwrap());
1986
1987        let records = vec![DailyHealth {
1988            id: None,
1989            profile_id: 1,
1990            date,
1991            steps: Some(10000),
1992            step_goal: None,
1993            total_calories: None,
1994            active_calories: None,
1995            bmr_calories: None,
1996            resting_hr: None,
1997            sleep_seconds: None,
1998            deep_sleep_seconds: None,
1999            light_sleep_seconds: None,
2000            rem_sleep_seconds: None,
2001            sleep_score: None,
2002            sleep_note: None,
2003            avg_stress: None,
2004            max_stress: None,
2005            body_battery_start: None,
2006            body_battery_end: None,
2007            hrv_weekly_avg: None,
2008            hrv_last_night: None,
2009            hrv_status: None,
2010            avg_respiration: None,
2011            avg_spo2: None,
2012            lowest_spo2: None,
2013            hydration_ml: None,
2014            moderate_intensity_min: None,
2015            vigorous_intensity_min: None,
2016            raw_json: None,
2017        }];
2018
2019        store.upsert_daily_health(&records).unwrap();
2020        assert!(store.has_daily_health(1, date).unwrap());
2021    }
2022
2023    #[test]
2024    fn test_has_track_points() {
2025        let temp = TempDir::new().unwrap();
2026        let store = ParquetStore::new(temp.path());
2027        let date = NaiveDate::from_ymd_opt(2024, 12, 15).unwrap();
2028
2029        assert!(!store.has_track_points(42, date).unwrap());
2030
2031        let points = vec![TrackPoint {
2032            id: None,
2033            activity_id: 42,
2034            timestamp: DateTime::from_timestamp(1703001600, 0).unwrap(),
2035            lat: Some(1.0),
2036            lon: Some(2.0),
2037            elevation: Some(3.0),
2038            heart_rate: None,
2039            cadence: None,
2040            power: None,
2041            speed: None,
2042        }];
2043
2044        store.write_track_points(date, &points).unwrap();
2045        assert!(store.has_track_points(42, date).unwrap());
2046    }
2047
2048    #[test]
2049    fn test_concurrent_read_write() {
2050        use std::sync::{Arc, Barrier};
2051        use std::thread;
2052
2053        let temp = TempDir::new().unwrap();
2054        let store = Arc::new(ParquetStore::new(temp.path()));
2055
2056        // Write initial data
2057        let initial_activities = vec![Activity {
2058            activity_id: 1,
2059            profile_id: 1,
2060            activity_name: Some("Initial Activity".to_string()),
2061            activity_type: Some("running".to_string()),
2062            start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2063            start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2064            duration_sec: Some(3600.0),
2065            distance_m: Some(10000.0),
2066            calories: None,
2067            avg_hr: None,
2068            max_hr: None,
2069            avg_speed: None,
2070            max_speed: None,
2071            elevation_gain: None,
2072            elevation_loss: None,
2073            avg_cadence: None,
2074            avg_power: None,
2075            normalized_power: None,
2076            training_effect: None,
2077            training_load: None,
2078            start_lat: None,
2079            start_lon: None,
2080            end_lat: None,
2081            end_lon: None,
2082            ground_contact_time: None,
2083            vertical_oscillation: None,
2084            stride_length: None,
2085            location_name: None,
2086            raw_json: None,
2087        }];
2088        store.upsert_activities(&initial_activities).unwrap();
2089
2090        // Barrier to coordinate reader/writer threads
2091        let barrier = Arc::new(Barrier::new(2));
2092        let store_reader = Arc::clone(&store);
2093        let barrier_reader = Arc::clone(&barrier);
2094
2095        // Reader thread: reads existing data while writer writes new data
2096        let reader_handle = thread::spawn(move || {
2097            barrier_reader.wait(); // Sync with writer
2098
2099            // Read the existing file
2100            let key = EntityType::Activities
2101                .partition_key(NaiveDate::from_ymd_opt(2023, 12, 19).unwrap());
2102            let path = store_reader.partition_path(EntityType::Activities, &key);
2103            let activities = store_reader.read_activities_from_path(&path).unwrap();
2104
2105            // Should see at least the initial activity
2106            assert!(!activities.is_empty(), "Reader should see initial data");
2107            activities.len()
2108        });
2109
2110        // Writer thread: writes new data to a DIFFERENT partition
2111        let writer_handle = thread::spawn(move || {
2112            barrier.wait(); // Sync with reader
2113
2114            // Write to a different partition (different week)
2115            let new_activities = vec![Activity {
2116                activity_id: 2,
2117                profile_id: 1,
2118                activity_name: Some("Concurrent Activity".to_string()),
2119                activity_type: Some("cycling".to_string()),
2120                start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), // Different week
2121                start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2122                duration_sec: Some(7200.0),
2123                distance_m: Some(50000.0),
2124                calories: None,
2125                avg_hr: None,
2126                max_hr: None,
2127                avg_speed: None,
2128                max_speed: None,
2129                elevation_gain: None,
2130                elevation_loss: None,
2131                avg_cadence: None,
2132                avg_power: None,
2133                normalized_power: None,
2134                training_effect: None,
2135                training_load: None,
2136                start_lat: None,
2137                start_lon: None,
2138                end_lat: None,
2139                end_lon: None,
2140                ground_contact_time: None,
2141                vertical_oscillation: None,
2142                stride_length: None,
2143                location_name: None,
2144                raw_json: None,
2145            }];
2146            store.upsert_activities(&new_activities).unwrap();
2147            2 // Return activity id written
2148        });
2149
2150        // Wait for both threads to complete
2151        let read_count = reader_handle.join().expect("Reader thread panicked");
2152        let written_id = writer_handle.join().expect("Writer thread panicked");
2153
2154        assert_eq!(read_count, 1, "Reader should have read 1 activity");
2155        assert_eq!(written_id, 2, "Writer should have written activity 2");
2156    }
2157
2158    #[test]
2159    fn test_duckdb_glob_query() {
2160        use duckdb::Connection;
2161
2162        let temp = TempDir::new().unwrap();
2163        let store = ParquetStore::new(temp.path());
2164
2165        // Write activities to multiple partitions
2166        let activities = vec![
2167            Activity {
2168                activity_id: 1,
2169                profile_id: 1,
2170                activity_name: Some("Week 51 Run".to_string()),
2171                activity_type: Some("running".to_string()),
2172                start_time_local: Some(DateTime::from_timestamp(1703001600, 0).unwrap()), // 2023-W51
2173                start_time_gmt: Some(DateTime::from_timestamp(1703001600, 0).unwrap()),
2174                duration_sec: Some(3600.0),
2175                distance_m: Some(10000.0),
2176                calories: None,
2177                avg_hr: None,
2178                max_hr: None,
2179                avg_speed: None,
2180                max_speed: None,
2181                elevation_gain: None,
2182                elevation_loss: None,
2183                avg_cadence: None,
2184                avg_power: None,
2185                normalized_power: None,
2186                training_effect: None,
2187                training_load: None,
2188                start_lat: None,
2189                start_lon: None,
2190                end_lat: None,
2191                end_lon: None,
2192                ground_contact_time: None,
2193                vertical_oscillation: None,
2194                stride_length: None,
2195                location_name: None,
2196                raw_json: None,
2197            },
2198            Activity {
2199                activity_id: 2,
2200                profile_id: 1,
2201                activity_name: Some("Week 52 Ride".to_string()),
2202                activity_type: Some("cycling".to_string()),
2203                start_time_local: Some(DateTime::from_timestamp(1703606400, 0).unwrap()), // 2023-W52
2204                start_time_gmt: Some(DateTime::from_timestamp(1703606400, 0).unwrap()),
2205                duration_sec: Some(7200.0),
2206                distance_m: Some(50000.0),
2207                calories: None,
2208                avg_hr: None,
2209                max_hr: None,
2210                avg_speed: None,
2211                max_speed: None,
2212                elevation_gain: None,
2213                elevation_loss: None,
2214                avg_cadence: None,
2215                avg_power: None,
2216                normalized_power: None,
2217                training_effect: None,
2218                training_load: None,
2219                start_lat: None,
2220                start_lon: None,
2221                end_lat: None,
2222                end_lon: None,
2223                ground_contact_time: None,
2224                vertical_oscillation: None,
2225                stride_length: None,
2226                location_name: None,
2227                raw_json: None,
2228            },
2229        ];
2230        store.upsert_activities(&activities).unwrap();
2231
2232        // Use DuckDB to query all Parquet files with glob pattern
2233        let conn = Connection::open_in_memory().unwrap();
2234        let glob_pattern = format!("{}/*.parquet", temp.path().join("activities").display());
2235
2236        let mut stmt = conn
2237            .prepare(&format!(
2238                "SELECT activity_id, activity_name FROM '{}' ORDER BY activity_id",
2239                glob_pattern
2240            ))
2241            .unwrap();
2242
2243        let results: Vec<(i64, String)> = stmt
2244            .query_map([], |row| {
2245                Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
2246            })
2247            .unwrap()
2248            .filter_map(|r| r.ok())
2249            .collect();
2250
2251        // Should see both activities from different partitions
2252        assert_eq!(results.len(), 2);
2253        assert_eq!(results[0], (1, "Week 51 Run".to_string()));
2254        assert_eq!(results[1], (2, "Week 52 Ride".to_string()));
2255    }
2256}