mod timon_engine;
use chrono::{DateTime, Duration, Local, Utc};
use serde_json::json;
use std::time::Instant;
pub use timon_engine::{
cloud_fetch_parquet, cloud_fetch_parquet_batch, cloud_sink_parquet, cloud_sync_parquet, create_database, create_table, delete_database,
delete_table, init_bucket, init_timon, insert, list_databases, list_tables, query, query_df,
};
#[cfg(feature = "dev_cli")]
mod cli;
#[cfg(feature = "cloud_server")]
mod server;
#[cfg(feature = "cloud_server")]
mod cloud_server {
use crate::server::timon_server;
use std::io;
#[actix_web::main]
pub async fn main() -> io::Result<()> {
timon_server().await
}
}
#[cfg(feature = "dev_cli")]
mod dev_cli {
use crate::cli::{convert_json_to_parquet, execute_query, Commands, CLI};
use clap::Parser;
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = CLI::parse();
match &cli.command {
Commands::Convert { input, output } => {
convert_json_to_parquet(input.as_str(), output.as_str())?;
println!("JSON converted to Parquet successfully.");
}
Commands::Query { file, query } => {
execute_query(file.as_str(), query.as_str()).await?;
}
}
Ok(())
}
}
#[cfg(feature = "cloud_server")]
fn main() {
if let Err(e) = cloud_server::main() {
eprintln!("Failed to start the cloud server: {}", e);
}
}
#[cfg(feature = "dev_cli")]
fn main() {
if let Err(e) = dev_cli::main() {
eprintln!("Failed to build the CLI tool: {}", e);
}
}
#[allow(dead_code)]
async fn test_concurrent_inserts() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== TESTING CONCURRENT INSERTS TO SAME PARTITION ===");
println!("This test will run multiple times to check for non-deterministic corruption...");
let mut corruption_detected = false;
let result = test_concurrent_inserts_single_run().await;
match result {
Ok(had_corruption) => {
if had_corruption {
corruption_detected = true;
println!("\n🔴 CORRUPTION DETECTED");
} else {
println!("\n✅ No corruption");
}
}
Err(e) => {
eprintln!("\n❌ Error: {}", e);
}
}
println!("\n{}", "=".repeat(60));
println!("FINAL RESULTS: Corruption detected: {}", corruption_detected);
if corruption_detected {
println!("🔴 CONCURRENT INSERT CORRUPTION CONFIRMED!");
} else {
println!("✅ No corruption detected");
}
Ok(())
}
#[allow(dead_code)]
async fn test_concurrent_inserts_single_run() -> Result<bool, Box<dyn std::error::Error>> {
use std::sync::Arc;
use std::thread;
println!("\n=== TESTING CONCURRENT INSERTS TO SAME PARTITION ===");
const STORAGE_PATH: &str = "tmp_concurrent_test";
const USERNAME: &str = "concurrent_test";
const DATABASE_NAME: &str = "test_db";
const TABLE_NAME: &str = "test_table";
const BUCKET_INTERVAL: u32 = 43200;
println!("Cleaning up any existing test data...");
if std::path::Path::new(STORAGE_PATH).exists() {
if let Err(e) = std::fs::remove_dir_all(STORAGE_PATH) {
eprintln!("Warning: Failed to remove storage directory: {}", e);
}
}
std::thread::sleep(std::time::Duration::from_millis(200));
println!("Initializing timon with monthly partitioning (43200)...");
let _ = init_timon(STORAGE_PATH, BUCKET_INTERVAL, USERNAME).unwrap();
let _ = create_database(DATABASE_NAME);
let table_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"value": {
"type": "int",
"required": true
},
"thread_id": {
"type": "int"
}
}
"#;
let _ = create_table(DATABASE_NAME, TABLE_NAME, &table_schema);
println!("Table created successfully");
std::thread::sleep(std::time::Duration::from_millis(200));
let metadata_path = format!("{}/metadata.json", STORAGE_PATH);
for attempt in 1..=5 {
if let Ok(contents) = std::fs::read_to_string(&metadata_path) {
if contents.contains(TABLE_NAME) && contents.contains(DATABASE_NAME) {
println!("✅ Metadata file verified (attempt {})", attempt);
break;
} else {
println!("⚠️ Metadata file exists but doesn't contain table (attempt {})", attempt);
}
} else {
println!("⚠️ Metadata file not found yet (attempt {})", attempt);
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
std::thread::sleep(std::time::Duration::from_millis(200));
println!("Verifying table exists...");
let mut verified = false;
for attempt in 1..=5 {
match list_tables(DATABASE_NAME) {
Ok(tables_result) => {
let tables_vec = tables_result["json_value"]
.as_array()
.and_then(|arr| Some(arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect::<Vec<_>>()))
.unwrap_or_default();
if tables_vec.contains(&TABLE_NAME.to_string()) {
println!("✅ Table verified: {} (attempt {})", TABLE_NAME, attempt);
verified = true;
break;
} else {
println!("⚠️ Table not found yet, attempt {}... Available: {:?}", attempt, tables_vec);
}
}
Err(e) => {
println!("⚠️ Error listing tables, attempt {}: {}", attempt, e);
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
if !verified {
return Err(format!("Table '{}' was not found after multiple attempts!", TABLE_NAME).into());
}
let _ = list_tables(DATABASE_NAME);
std::thread::sleep(std::time::Duration::from_millis(100));
const NUM_THREADS: usize = 10;
const RECORDS_PER_THREAD: usize = 50;
const TOTAL_EXPECTED_RECORDS: usize = NUM_THREADS * RECORDS_PER_THREAD;
println!(
"\nSpawning {} threads, each inserting {} records to the same partition",
NUM_THREADS, RECORDS_PER_THREAD
);
println!("Expected total records: {}", TOTAL_EXPECTED_RECORDS);
let start_time = std::time::Instant::now();
let base_timestamp = chrono::Utc::now().timestamp();
let insert_attempts = Arc::new(std::sync::Mutex::new(0usize));
let insert_successes = Arc::new(std::sync::Mutex::new(0usize));
let insert_errors = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
let mut handles = vec![];
for thread_id in 0..NUM_THREADS {
let insert_attempts_clone = Arc::clone(&insert_attempts);
let insert_successes_clone = Arc::clone(&insert_successes);
let insert_errors_clone = Arc::clone(&insert_errors);
let base_timestamp_clone = base_timestamp;
let bucket_interval = BUCKET_INTERVAL;
let storage_path = STORAGE_PATH.to_string();
let username = USERNAME.to_string();
let db_name = DATABASE_NAME.to_string();
let table_name = TABLE_NAME.to_string();
let handle = thread::spawn(move || {
use timon_engine::db_manager::DatabaseManager;
let mut db_manager = DatabaseManager::new(
&storage_path,
bucket_interval, &username,
);
let mut records = Vec::new();
for i in 0..RECORDS_PER_THREAD {
let offset_seconds = (thread_id * RECORDS_PER_THREAD + i) as i64;
let record_timestamp = base_timestamp_clone + offset_seconds;
let record_date = chrono::DateTime::<Utc>::from_timestamp(record_timestamp, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
let record = json!({
"date": record_date,
"value": thread_id * 1000 + i,
"thread_id": thread_id
});
records.push(record);
}
let json_data = serde_json::to_string(&records).unwrap();
{
let mut count = insert_attempts_clone.lock().unwrap();
*count += RECORDS_PER_THREAD;
}
let insert_start = std::time::Instant::now();
match db_manager.insert(&db_name, &table_name, &json_data) {
Ok(_) => {
let insert_duration = insert_start.elapsed();
let mut count = insert_successes_clone.lock().unwrap();
*count += RECORDS_PER_THREAD;
if thread_id == 0 && *count == RECORDS_PER_THREAD {
eprintln!("Thread 0: First insert completed in {:.3}s", insert_duration.as_secs_f64());
}
}
Err(e) => {
let error_msg = format!("Thread {}: {}", thread_id, e);
eprintln!("{}", error_msg);
let mut errors = insert_errors_clone.lock().unwrap();
errors.push(error_msg);
}
}
});
handles.push(handle);
}
println!("Waiting for all threads to complete...");
for (idx, handle) in handles.into_iter().enumerate() {
match handle.join() {
Ok(_) => {
}
Err(e) => {
eprintln!("⚠️ Thread {} panicked: {:?}", idx, e);
}
}
}
let elapsed = start_time.elapsed();
println!("All threads completed in {:.2} seconds", elapsed.as_secs_f64());
let attempts = *insert_attempts.lock().unwrap();
let successes = *insert_successes.lock().unwrap();
let errors = insert_errors.lock().unwrap().clone();
std::thread::sleep(std::time::Duration::from_millis(300));
println!("\n=== INSERTION RESULTS ===");
println!("Total insertion attempts: {}", attempts);
println!("Reported successful insertions: {}", successes);
println!("Reported failed insertions: {}", errors.len());
if !errors.is_empty() {
println!("\nError details:");
for error in errors.iter().take(5) {
println!(" - {}", error);
}
if errors.len() > 5 {
println!(" ... and {} more errors", errors.len() - 5);
}
}
println!("\n=== FILESYSTEM STATE CHECK ===");
let table_path = format!("{}/data/{}/{}", STORAGE_PATH, DATABASE_NAME, TABLE_NAME);
if std::path::Path::new(&table_path).exists() {
println!("✅ Table directory exists: {}", table_path);
let partition_count = std::fs::read_dir(&table_path)
.map(|entries| entries.filter_map(|e| e.ok()).filter(|e| e.path().is_dir()).count())
.unwrap_or(0);
println!(" Found {} partition directories", partition_count);
let mut parquet_files = 0;
let mut total_size = 0u64;
if let Ok(entries) = std::fs::read_dir(&table_path) {
for entry in entries.flatten() {
if entry.path().is_dir() {
let parquet_file = entry.path().join("data.parquet");
if parquet_file.exists() {
parquet_files += 1;
if let Ok(metadata) = std::fs::metadata(&parquet_file) {
total_size += metadata.len();
}
}
}
}
}
println!(" Found {} parquet files, total size: {} bytes", parquet_files, total_size);
} else {
println!("⚠️ Table directory does NOT exist: {}", table_path);
}
println!("\n=== QUERYING ACTUAL RECORDS ===");
let query_result = query(DATABASE_NAME, "SELECT COUNT(*) as total FROM test_table", None, None).await?;
let actual_count = query_result["json_value"]
.as_array()
.and_then(|arr| arr.get(0))
.and_then(|obj| obj.get("total"))
.and_then(|v| v.as_u64())
.unwrap_or(0) as usize;
println!("Expected records: {}", TOTAL_EXPECTED_RECORDS);
println!("Actual records in database: {}", actual_count);
let all_records_query = query(DATABASE_NAME, "SELECT * FROM test_table ORDER BY value", None, None).await?;
let empty_array = vec![];
let all_records = all_records_query["json_value"].as_array().unwrap_or(&empty_array);
println!("Records retrieved from query: {}", all_records.len());
let mut seen_values = std::collections::HashSet::new();
let mut duplicates = 0;
let mut missing_threads = std::collections::HashSet::new();
for record in all_records {
if let Some(value) = record.get("value").and_then(|v| v.as_u64()) {
if !seen_values.insert(value) {
duplicates += 1;
println!(" ⚠️ Duplicate value found: {}", value);
}
}
if let Some(thread_id) = record.get("thread_id").and_then(|v| v.as_u64()) {
missing_threads.insert(thread_id);
}
}
println!("\n=== DATA INTEGRITY CHECK ===");
println!("Duplicate values found: {}", duplicates);
println!("Threads with data present: {}/{}", missing_threads.len(), NUM_THREADS);
let data_loss = TOTAL_EXPECTED_RECORDS.saturating_sub(actual_count);
let loss_percentage = if TOTAL_EXPECTED_RECORDS > 0 {
(data_loss as f64 / TOTAL_EXPECTED_RECORDS as f64) * 100.0
} else {
0.0
};
println!("\n=== CORRUPTION ANALYSIS ===");
let had_corruption: bool = if actual_count < TOTAL_EXPECTED_RECORDS {
println!("🔴 DATA LOSS DETECTED!");
println!(" Lost {} records ({:.2}% loss)", data_loss, loss_percentage);
println!(" This confirms concurrent insert corruption!");
true
} else if duplicates > 0 {
println!("🟡 DATA INTEGRITY ISSUE!");
println!(" Found {} duplicate values", duplicates);
true
} else if actual_count == TOTAL_EXPECTED_RECORDS {
println!("✅ All records present");
false
} else {
false
};
println!("\n=== PARQUET FILE INTEGRITY CHECK ===");
let table_path = format!("{}/data/{}/{}", STORAGE_PATH, DATABASE_NAME, TABLE_NAME);
println!("Table path: {}", table_path);
use timon_engine::helpers::rounded_timestamp;
let expected_partition = rounded_timestamp(base_timestamp, BUCKET_INTERVAL);
println!("Expected partition for all records: partition_date={}", expected_partition);
let partition_dir = std::fs::read_dir(&table_path).ok();
if let Some(entries) = partition_dir {
let mut partition_count = 0;
for entry in entries.flatten() {
if entry.path().is_dir() {
partition_count += 1;
let entry_path = entry.path();
let partition_name = entry_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown");
println!("\nPartition found: {}", partition_name);
let parquet_file = entry_path.join("data.parquet");
if parquet_file.exists() {
println!(" Parquet file: {}", parquet_file.display());
match std::fs::File::open(&parquet_file) {
Ok(_) => {
let file_size = std::fs::metadata(&parquet_file).map(|m| m.len()).unwrap_or(0);
println!(" File size: {} bytes", file_size);
if file_size == 0 {
println!(" ⚠️ WARNING: File is empty (possible corruption)");
} else {
use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
match SerializedFileReader::new(std::fs::File::open(&parquet_file)?) {
Ok(reader) => {
let metadata = reader.metadata();
let num_rows = metadata.file_metadata().num_rows();
println!(" ✅ File is readable, {} rows in metadata", num_rows);
}
Err(e) => {
println!(" 🔴 CORRUPTION DETECTED: Cannot read parquet file: {}", e);
}
}
}
}
Err(e) => {
println!(" 🔴 ERROR: Cannot open file: {}", e);
}
}
} else {
println!(" ⚠️ No parquet file found in partition");
}
}
}
if partition_count == 0 {
println!(" ⚠️ No partitions found in table directory");
}
} else {
println!(" ⚠️ Cannot read table directory: {}", table_path);
}
Ok(had_corruption)
}
#[allow(dead_code)]
async fn test_local_storage() {
const STORAGE_PATH: &str = "tmp";
const USERNAME: &str = "ahmed_test";
let timon_result = init_timon(STORAGE_PATH, 5, USERNAME).unwrap();
println!("init_timon -> {}", timon_result);
const DATABASE_NAME: &str = "zivaring";
const TABLE_NAME: &str = "activitydetails";
let database_result = create_database(DATABASE_NAME);
println!("create_database -> {}", database_result.unwrap());
let table_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"distance": {
"type": "float",
"max": 2500
},
"step": {
"type": "int",
"min": 10,
"max": 100
},
"calories": {
"type": "float",
"min": 50,
"max": 1200
},
"arraySteps": {
"type": "array"
},
"is_sync": {
"type": "bool"
}
}
"#;
let table_result = create_table(DATABASE_NAME, TABLE_NAME, &table_schema);
println!("create_table -> {}", table_result.unwrap());
let databases_list: serde_json::Value = list_databases().unwrap();
let tables_list = list_tables(DATABASE_NAME).unwrap();
println!("databases_list -> {:?}", databases_list);
println!("tables_list -> {:?}", tables_list);
struct DataPoint {
date: String,
array_steps: Vec<i32>,
calories: i32,
distance: f64,
step: i32,
}
fn generate_data(n: usize) -> String {
let start_time = Local::now().naive_local() - Duration::hours(12); let mut data = Vec::new();
let mut time_counter = 0;
for i in 0..n {
time_counter += 1000;
let date = start_time + Duration::milliseconds(time_counter);
let array_steps: Vec<i32> = (0..10).map(|x| (i as i32 + x) % 50).collect();
let calories = (i % 50) + 1;
let distance = (i as f64 * 0.01) % 5.0;
let step = array_steps.iter().sum::<i32>();
data.push(json!({
"date": date.format("%Y.%m.%d %H:%M:%S").to_string(),
"arraySteps": array_steps,
"calories": calories,
"distance": distance,
"step": step
}));
}
serde_json::to_string_pretty(&data).unwrap()
}
let json_data: String = r#"
[
{"date":"2025.02.10 10:00:00","arraySteps":[18,0,0,20,0,0,0,0,0,0],"calories":111,"distance":0.01,"step":10},
{"date":"2025.02.10 10:01:00","arraySteps":[43,39,0,0,0,0,0,0,0,0],"calories":200,"distance":0.05,"step":11},
{"date":"2025.02.10 10:02:00","arraySteps":[20,0,0,0,0,0,0,0,0,0],"calories":160,"distance":0.01,"step":12},
{"date":"2025.02.10 10:03:00","arraySteps":[19,0,0,0,0,0,0,0,0,0],"calories":111,"distance":0.01,"step":1013},
{"date":"2025.02.10 10:21:00","arraySteps":[54,33,2,0,0,0,0,0,0,0],"calories":180,"distance":0.06,"step":21},
{"date":"2025.02.10 10:25:00","arraySteps":[38,0,0,15,0,0,0,0,0,0],"calories":120,"distance":0.03,"step":25},
{"date":"2025.02.10 10:30:00","arraySteps":[50,16,0,55,23,0,0,18,46,0],"calories":6,"distance":140,"step":30},
{"date":"2025.02.10 10:31:00","arraySteps":[18,0,0,20,0,0,0,0,0,0],"calories":170,"distance":2530.5,"step":1031}
]
"#
.to_string();
let start_time = Instant::now(); let insertion_result = insert(DATABASE_NAME, TABLE_NAME, &json_data);
let duration = start_time.elapsed(); println!("Insertion result: {}", insertion_result.unwrap());
println!("Time taken for insertion: {:.3} seconds", duration.as_secs_f64());
let sql_query = format!(r#"SELECT * FROM activitydetails ORDER BY date ASC"#);
let query_result = query(DATABASE_NAME, &sql_query, None, None).await;
println!("query_result: {}", query_result.unwrap()["json_value"]);
}
#[allow(dead_code)]
async fn test_s3_sync() {
const USERNAME: &str = "ahmed_test";
const DATABASE_NAME: &str = "zivaring";
const TABLE_NAME: &str = "activitydetails";
init_timon("tmp", 43200, USERNAME).unwrap();
let bucket_endpoint = "https://s3.us-west-2.amazonaws.com";
let bucket_name = "zivaoneapp";
let access_key_id = "xxx";
let secret_access_key = "xxx";
let bucket_region = "us-west-2";
let init_bucket_result = init_bucket(bucket_endpoint, bucket_name, access_key_id, secret_access_key, bucket_region).unwrap();
println!("init_bucket_result: {}", init_bucket_result);
let fetch_range = std::collections::HashMap::from([("start_date", "2025-01-01"), ("end_date", "2025-12-30")]);
let cloud_fetch_parquet_result = cloud_fetch_parquet(USERNAME, DATABASE_NAME, TABLE_NAME, fetch_range.clone()).await;
println!("{}", cloud_fetch_parquet_result.unwrap());
let cloud_sink_parquet_result = cloud_sink_parquet(DATABASE_NAME, TABLE_NAME).await;
println!("{}", cloud_sink_parquet_result.unwrap());
let cloud_sync_parquet_result = cloud_sync_parquet(DATABASE_NAME, TABLE_NAME, fetch_range.clone(), None);
println!("{}", cloud_sync_parquet_result.await.unwrap());
}
#[cfg(all(not(feature = "dev_cli"), not(feature = "cloud_server")))]
fn main() {
tokio::runtime::Runtime::new().expect("Failed to create runtime").block_on(async {
});
}
#[allow(dead_code)]
async fn test_ziva_ring_insert() -> Result<(), Box<dyn std::error::Error>> {
const STORAGE_PATH: &str = "tmp";
const USERNAME: &str = "ahmed_test";
let timon_result = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
println!("init_timon -> {}", timon_result);
const DATABASE_NAME: &str = "zivaring";
let database_result = create_database(DATABASE_NAME);
println!("create_database -> {}", database_result.unwrap());
let activity_details_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"step": {
"type": "int"
},
"arraySteps": {
"type": "array"
},
"calories": {
"type": "float"
},
"distance": {
"type": "float"
}
}
"#;
let spo2_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"automaticSpo2Data": {
"type": "int"
}
}
"#;
let heartrate_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"singleHR": {
"type": "int"
}
}
"#;
let hrv_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"hrv": {
"type": "int"
},
"heartRate": {
"type": "int"
},
"stress": {
"type": "int"
},
"diastolicBP": {
"type": "int"
},
"systolicBP": {
"type": "int"
},
"vascularAging": {
"type": "int"
},
"is_sync": {
"type": "bool"
}
}
"#;
let sleep_schema = r#"
{
"date": {
"type":"int",
"required":true,
"unique":true,
"datetime":true
},
"unitLength":{
"type":"int"
},
"quality":{
"type":"int"
},
"start":{
"type":"string"
}
}
"#;
let temperature_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"temperature": {
"type": "float"
}
}
"#;
let activity_details_result = create_table(DATABASE_NAME, "activitydetails", &activity_details_schema);
println!("Create activitydetails table -> {}", activity_details_result.unwrap());
let sleep_result = create_table(DATABASE_NAME, "sleep", &sleep_schema);
println!("Create sleep table -> {}", sleep_result.unwrap());
let spo2_result = create_table(DATABASE_NAME, "spo2", &spo2_schema);
println!("Create SPO2 table -> {}", spo2_result.unwrap());
let hr_result = create_table(DATABASE_NAME, "heartrate", &heartrate_schema);
println!("Create heart rate table -> {}", hr_result.unwrap());
let hrv_result = create_table(DATABASE_NAME, "hrv_table", &hrv_schema);
println!("Create HRV table -> {}", hrv_result.unwrap());
let temp_result = create_table(DATABASE_NAME, "temperature_readings", &temperature_schema);
println!("Create temperature table -> {}", temp_result.unwrap());
let file_content =
std::fs::read_to_string("/home/ahmed/Documents/ziva_data_android.json").map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
let json_data: serde_json::Value = serde_json::from_str(&file_content).map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
let start_time = Instant::now();
if let Some(activity_details) = json_data["activitydetails"].as_array() {
let formatted_activity_details: Vec<serde_json::Value> = activity_details
.iter()
.map(|reading| {
let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
json!({
"date": date,
"step": reading["step"],
"arraySteps": reading["arraySteps"],
"calories": reading["calories"],
"distance": reading["distance"]
})
})
.collect();
let activity_details_json = serde_json::to_string(&formatted_activity_details)?;
let insertion_result = insert(DATABASE_NAME, "activitydetails", &activity_details_json)?;
println!("Activity details insertion result: {}", insertion_result);
}
if let Some(spo2) = json_data["spo2"].as_array() {
let formatted_spo2: Vec<serde_json::Value> = spo2
.iter()
.map(|reading| {
let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
json!({
"date": date,
"automaticSpo2Data": reading["automaticSpo2Data"]
})
})
.collect();
let spo2_json = serde_json::to_string(&formatted_spo2)?;
let insertion_result = insert(DATABASE_NAME, "spo2", &spo2_json)?;
println!("SPO2 insertion result: {}", insertion_result);
}
if let Some(heartrate) = json_data["heartrate"].as_array() {
let formatted_hr: Vec<serde_json::Value> = heartrate
.iter()
.map(|reading| {
let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
json!({
"date": date,
"singleHR": reading["singleHR"]
})
})
.collect();
let heartrate_json = serde_json::to_string(&formatted_hr)?;
let insertion_result = insert(DATABASE_NAME, "heartrate", &heartrate_json)?;
println!("Heart rate insertion result: {}", insertion_result);
}
if let Some(hrv) = json_data["hrv_table"].as_array() {
let formatted_hrv: Vec<serde_json::Value> = hrv
.iter()
.map(|reading| {
let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
json!({
"date": date,
"heartRate": reading["heartRate"],
"hrv": reading["hrv"],
"stress": reading["stress"],
"vascularAging": reading["vascularAging"],
"diastolicBP": reading["diastolicBP"],
"systolicBP": reading["systolicBP"],
})
})
.collect();
let hrv_json = serde_json::to_string(&formatted_hrv)?;
let insertion_result = insert(DATABASE_NAME, "hrv_table", &hrv_json)?;
println!("HRV insertion result: {}", insertion_result);
}
if let Some(sleep) = json_data["sleep"].as_array() {
let formatted_sleep: Vec<serde_json::Value> = sleep
.iter()
.map(|reading| {
let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
json!({
"date": date,
"unitLength": reading["unitLength"],
"quality": reading["quality"],
"start": reading["start"]
})
})
.collect();
let sleep_json = serde_json::to_string(&formatted_sleep)?;
let insertion_result = insert(DATABASE_NAME, "sleep", &sleep_json)?;
println!("Sleep insertion result: {}", insertion_result);
}
if let Some(temperature) = json_data["temperature_table"].as_array() {
let formatted_temp: Vec<serde_json::Value> = temperature
.iter()
.map(|reading| {
let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
json!({
"date": date,
"temperature": reading["temperature"]
})
})
.collect();
let temperature_json = serde_json::to_string(&formatted_temp)?;
let insertion_result = insert(DATABASE_NAME, "temperature_readings", &temperature_json)?;
println!("Temperature insertion result: {}", insertion_result);
}
let duration = start_time.elapsed();
println!("Total time taken for all insertions: {:.3} seconds", duration.as_secs_f64());
Ok(())
}
#[allow(dead_code)]
async fn test_ziva_ring_query() -> Result<(), Box<dyn std::error::Error>> {
const STORAGE_PATH: &str = "tmp";
const DATABASE_NAME: &str = "zivaring";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing queries for user: {} ===", username);
let start_time = Instant::now();
let activity_details_query = format!(r#"SELECT * FROM activitydetails"#);
let activity_details_result = query(DATABASE_NAME, &activity_details_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Activity details {} (Time taken: {:.3} seconds)",
activity_details_result["status"],
duration.as_secs_f64()
);
let start_time = Instant::now();
let spo2_query = format!(r#"SELECT * FROM spo2"#);
let spo2_result = query(DATABASE_NAME, &spo2_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"SPO2 readings {} (Time taken: {:.3} seconds)",
spo2_result["status"],
duration.as_secs_f64()
);
let start_time = Instant::now();
let hr_query = format!(r#"SELECT * FROM heartrate"#);
let hr_result = query(DATABASE_NAME, &hr_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Heart rate readings {} (Time taken: {:.3} seconds)",
hr_result["status"],
duration.as_secs_f64()
);
let start_time = Instant::now();
let hrv_query = format!(r#"SELECT * FROM hrv_table"#);
let hrv_result = query(DATABASE_NAME, &hrv_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"HRV readings {} (Time taken: {:.3} seconds)",
hrv_result["status"],
duration.as_secs_f64()
);
let start_time = Instant::now();
let temp_query = format!(r#"SELECT * FROM temperature_readings"#);
let temp_result = query(DATABASE_NAME, &temp_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Temperature readings {} (Time taken: {:.3} seconds)",
temp_result["status"],
duration.as_secs_f64()
);
println!("\nTesting specific queries:");
let start_time = Instant::now();
let avg_hr_query = format!(r#"SELECT * FROM heartrate"#);
let avg_hr_result = query(DATABASE_NAME, &avg_hr_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Average heart rate {} (Time taken: {:.3} seconds)",
avg_hr_result["status"],
duration.as_secs_f64()
);
let start_time = Instant::now();
let max_spo2_query = format!(r#"SELECT * FROM spo2"#);
let max_spo2_result = query(DATABASE_NAME, &max_spo2_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Max SPO2: {} (Time taken: {:.3} seconds)",
max_spo2_result["status"],
duration.as_secs_f64()
);
let start_time = Instant::now();
let stress_query = format!(r#"SELECT * FROM hrv_table"#);
let stress_result = query(DATABASE_NAME, &stress_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Stress levels over time: {} (Time taken: {:.3} seconds)",
stress_result["status"],
duration.as_secs_f64()
);
}
Ok(())
}
#[allow(dead_code)]
async fn test_ziva_join_query() -> Result<(), Box<dyn std::error::Error>> {
const STORAGE_PATH: &str = "tmp";
const DATABASE_NAME: &str = "zivaring";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing JOIN query for user: {} ===", username);
let start_time = Instant::now();
let sql_query = "SELECT * FROM activitydetails JOIN spo2 ON to_char(to_timestamp(activitydetails.date), 'YYYY-MM-DD') = to_char(to_timestamp(spo2.date), 'YYYY-MM-DD') LIMIT 100";
let result = query(DATABASE_NAME, sql_query, None, None).await?;
let duration = start_time.elapsed();
println!("Query time: {:.3} seconds", duration.as_secs_f64());
println!("JOIN Query Result: {} status: {}", result["json_value"], result["status"]);
}
Ok(())
}
fn generate_spo2_data(start: &str, end: &str) -> Result<String, Box<dyn std::error::Error>> {
use chrono::{Duration, NaiveDateTime};
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};
fn get_random_number(max: u32) -> u32 {
let seed = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u32;
seed % max
}
let start_time = NaiveDateTime::parse_from_str(start, "%Y-%m-%d %H:%M:%S")?;
let end_time = NaiveDateTime::parse_from_str(end, "%Y-%m-%d %H:%M:%S")?;
let mut data = Vec::new();
let mut current = start_time;
while current < end_time {
data.push(json!({
"date": current.format("%Y-%m-%d %H:%M:%S").to_string(),
"automaticSpo2Data": get_random_number(100)
}));
current = current + Duration::minutes(5);
}
Ok(serde_json::to_string(&data)?)
}
#[allow(dead_code)]
async fn insert_ziva_data_six_months() -> Result<(), Box<dyn std::error::Error>> {
const STORAGE_PATH: &str = "tmp";
const USERNAME: &str = "ahmed_test";
const DATABASE_NAME: &str = "zivaring";
let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
let _ = create_database(DATABASE_NAME);
let spo2_schema = r#"
{
"date": {
"type": "int",
"required": true,
"unique": true,
"datetime": true
},
"automaticSpo2Data": {
"type": "int"
}
}
"#;
let spo2_result = create_table(DATABASE_NAME, "spo2", &spo2_schema);
println!("Create SPO2 table -> {}", spo2_result.unwrap());
let start: &'static str = "2024-11-01 00:00:00";
let end = "2025-05-01 00:00:00";
let _json_data = generate_spo2_data(start, end)?;
const QUERY: &str = "SELECT * FROM spo2 ORDER BY date ASC";
let start_time = std::time::Instant::now();
let result = query(DATABASE_NAME, QUERY, None, None).await?;
let duration = start_time.elapsed();
println!("Query execution time: {:.3} seconds", duration.as_secs_f64());
println!("Result: {:?}", result.get("status").unwrap());
Ok(())
}
#[allow(dead_code)]
async fn test_ziva_range_selction_query() -> Result<(), Box<dyn std::error::Error>> {
const STORAGE_PATH: &str = "tmp";
const DATABASE_NAME: &str = "zivaring";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing range selection query for user: {} ===", username);
let start_time = Instant::now();
const QUERY_2: &str = "SELECT COUNT(*) AS total FROM activitydetails WHERE partition_date BETWEEN '2025-08-27' AND '2025-09-20'";
let result = query(DATABASE_NAME, QUERY_2, None, None).await?;
let duration = start_time.elapsed();
println!(
"Range Selection Result: {} status: {} (Time taken: {:.3} seconds)",
result["json_value"],
result["status"],
duration.as_secs_f64()
);
}
Ok(())
}
#[allow(dead_code)]
async fn test_partition_limit() -> Result<(), Box<dyn std::error::Error>> {
const STORAGE_PATH: &str = "tmp";
const USERNAME: &str = "ahmed_test";
const DATABASE_NAME: &str = "zivaring";
let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
println!("\n=== Testing Partition Limit Functionality ===");
let sql_query = "SELECT COUNT(*) AS total FROM hrv_table";
let result = query(DATABASE_NAME, sql_query, None, Some(1)).await?;
println!("Last 2 partitions result: {} status: {}", result["json_value"], result["status"]);
let result2 = query(DATABASE_NAME, sql_query, None, Some(2)).await?;
println!("Last 3 partitions result: {} status: {}", result2["json_value"], result2["status"]);
let result3 = query(DATABASE_NAME, sql_query, None, None).await?;
println!("All partitions result: {} status: {}", result3["json_value"], result3["status"]);
Ok(())
}
#[allow(dead_code)]
async fn test_sleep_queries() -> Result<(), Box<dyn std::error::Error>> {
println!("Testing Sleep Queries for Daily Vitality Score");
const STORAGE_PATH: &str = "tmp";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing Sleep Queries for user: {} ===", username);
println!("\n=== LAST NIGHT'S SLEEP DATA ===");
let sleep_dates = vec![("2025-09-22", 1758499200, 1758585599)];
for (date_label, start_ts, end_ts) in &sleep_dates {
println!("\n--- Sleep data for {} ---", date_label);
let start_time = Instant::now();
let last_night_query = format!(
r#"
SELECT
COUNT(DISTINCT start) as sleep_sessions_count,
COUNT(*) / 60.0 AS sleep_total_hours,
(
SELECT MAX(session_minutes) / 60.0
FROM (
SELECT start, COUNT(*) as session_minutes
FROM sleep
WHERE date BETWEEN {} AND {}
GROUP BY start
) as session_counts
) AS sleep_longest_session,
COUNT(CASE WHEN quality = 1 THEN 1 END) / 60.0 AS sleep_light_hours,
COUNT(CASE WHEN quality = 2 THEN 1 END) / 60.0 AS sleep_deep_hours,
COUNT(CASE WHEN quality = 3 THEN 1 END) / 60.0 AS sleep_rem_hours
FROM sleep
WHERE date BETWEEN {} AND {}
"#,
start_ts, end_ts, start_ts, end_ts
);
let last_night_result = query("zivaring", &last_night_query, None, None).await?;
let duration = start_time.elapsed();
println!("(Query time: {:.3} seconds)", duration.as_secs_f64());
let last_night_value = last_night_result["json_value"][0].clone();
println!(
"{:.1} hours total sleep, but broken into {} separate sessions - longest only {:.1} hours",
last_night_value["sleep_total_hours"].as_f64().unwrap_or(0.0),
last_night_value["sleep_sessions_count"],
last_night_value["sleep_longest_session"].as_f64().unwrap_or(0.0)
);
println!(
"Deep: {:.1}h * Light: {:.1}h * REM: {:.1}h",
last_night_value["sleep_deep_hours"].as_f64().unwrap_or(0.0),
last_night_value["sleep_light_hours"].as_f64().unwrap_or(0.0),
last_night_value["sleep_rem_hours"].as_f64().unwrap_or(0.0)
);
}
println!("\n=== SLEEP CONSISTENCY DATA (Previous 6 nights) ===");
let consistency_dates = vec![
("2025-09-22", 1758499200, 1758585599), ("2025-09-21", 1758412800, 1758499199), ("2025-09-20", 1758326400, 1758412799), ("2025-09-19", 1758240000, 1758326399), ("2025-09-18", 1758153600, 1758239999), ("2025-09-17", 1758067200, 1758153599), ];
let mut consistency_data = Vec::new();
for (date_label, start_ts, end_ts) in &consistency_dates {
let day_start_time = Instant::now();
let day_consistency_query = format!(
r#"
SELECT
'{}' as date,
COUNT(DISTINCT start) as sessions_count,
COALESCE(SUM(session_duration), 0) as total_sleep_minutes
FROM (
SELECT start, COUNT(*) as session_duration
FROM sleep
WHERE date BETWEEN {} AND {}
GROUP BY start
) as daily_sessions
"#,
date_label, start_ts, end_ts
);
let day_result = query("zivaring", &day_consistency_query, None, None).await?;
let day_duration = day_start_time.elapsed();
println!(
"day_result {} status: {} (Time: {:.3}s) \n",
day_result["json_value"],
day_result["status"],
day_duration.as_secs_f64()
);
if let Some(day_data) = day_result["json_value"].as_array().and_then(|arr| arr.get(0)) {
if let (Some(sessions), Some(minutes)) = (day_data["sessions_count"].as_i64(), day_data["total_sleep_minutes"].as_i64()) {
consistency_data.push((sessions, minutes));
}
}
}
if consistency_data.len() >= 3 {
let durations: Vec<f64> = consistency_data.iter().map(|(_, minutes)| *minutes as f64).collect();
let mean = durations.iter().sum::<f64>() / durations.len() as f64;
let variance = durations.iter().map(|duration| (duration - mean).powi(2)).sum::<f64>() / durations.len() as f64;
let stddev = variance.sqrt();
let consistency_score = if stddev <= 30.0 {
100 } else if stddev <= 60.0 {
75 } else if stddev <= 90.0 {
50 } else {
25 };
let consistency_message = if consistency_score >= 90 {
"Excellent sleep consistency this week"
} else if consistency_score >= 70 {
"Good sleep routine maintained"
} else if consistency_score >= 50 {
"Sleep schedule somewhat variable"
} else {
"Irregular sleep pattern - try consistent bedtime"
};
println!("\n=== SLEEP CONSISTENCY SCORE ===");
println!("Duration Standard Deviation: {:.1} minutes", stddev);
println!("Consistency Score: {}/100 ({})", consistency_score, consistency_message);
println!("\n=== INDIVIDUAL NIGHT DATA ===");
for (i, (date_label, _, _)) in consistency_dates.iter().enumerate() {
if i < consistency_data.len() {
let (sessions, minutes) = consistency_data[i];
println!(
"{}: {} sessions, {} minutes ({:.1}h)",
date_label,
sessions,
minutes,
minutes as f64 / 60.0
);
}
}
} else {
println!("\n=== SLEEP CONSISTENCY SCORE ===");
println!("Insufficient data for consistency scoring (need at least 3 nights)");
println!("Available data points: {}", consistency_data.len());
}
}
Ok(())
}
#[allow(dead_code)]
async fn test_hrv_queries() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== HRV QUERIES FOR RECOVERY COMPONENT ===");
const STORAGE_PATH: &str = "tmp";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing HRV queries for user: {} ===", username);
let start_time = Instant::now();
let rhr_hrv_query = r#"
WITH date_params AS (
SELECT
'2025-10-01'::DATE as target_date_local,
'America/Los_Angeles' as user_timezone,
('2025-10-01'::DATE + INTERVAL '7 hours') as day_start_utc,
('2025-10-01'::DATE + INTERVAL '1 day' + INTERVAL '7 hours') as day_end_utc,
('2025-10-01'::DATE - INTERVAL '30 days' + INTERVAL '7 hours') as baseline_start_utc,
('2025-10-01'::DATE + INTERVAL '7 hours') as baseline_end_utc
),
today_hrv_data AS (
SELECT
TO_TIMESTAMP(h.date) as hrv_timestamp,
h.hrv,
h.stress,
h."heartRate"
FROM hrv_table h
CROSS JOIN date_params dp
WHERE TO_TIMESTAMP(h.date)
BETWEEN dp.day_start_utc AND dp.day_end_utc
AND h.hrv > 0
AND h.hrv < 200
),
today_hrv_summary AS (
SELECT
COUNT(*) as reading_count,
AVG(hrv) as avg_hrv,
STDDEV(hrv) as hrv_stddev,
MIN(hrv) as min_hrv,
MAX(hrv) as max_hrv,
AVG(stress) as avg_stress,
AVG("heartRate") as avg_heart_rate
FROM today_hrv_data
),
baseline_hrv_data AS (
SELECT
DATE_TRUNC('day', TO_TIMESTAMP(h.date)) as day_utc,
h.hrv
FROM hrv_table h
CROSS JOIN date_params dp
WHERE TO_TIMESTAMP(h.date)
BETWEEN dp.baseline_start_utc AND dp.day_start_utc
AND h.hrv > 0
AND h.hrv < 200
),
daily_baseline_hrv AS (
SELECT
day_utc,
AVG(hrv) as daily_avg_hrv,
COUNT(*) as daily_reading_count
FROM baseline_hrv_data
GROUP BY day_utc
HAVING COUNT(*) >= 3
),
baseline_stats AS (
SELECT
AVG(daily_avg_hrv) as baseline_hrv,
STDDEV(daily_avg_hrv) as baseline_stddev,
MIN(daily_avg_hrv) as baseline_min,
MAX(daily_avg_hrv) as baseline_max,
COUNT(*) as baseline_days_count
FROM daily_baseline_hrv
),
hrv_recovery_calculation AS (
SELECT
th.avg_hrv as today_hrv,
th.reading_count as today_readings,
th.hrv_stddev as today_stddev,
th.min_hrv as today_min,
th.max_hrv as today_max,
th.avg_stress as today_stress,
th.avg_heart_rate as today_heart_rate,
bs.baseline_hrv,
bs.baseline_stddev,
bs.baseline_min,
bs.baseline_max,
bs.baseline_days_count,
CASE
WHEN bs.baseline_stddev > 0 AND th.avg_hrv IS NOT NULL THEN
(th.avg_hrv - bs.baseline_hrv) / bs.baseline_stddev
ELSE NULL
END as z_score,
CASE
WHEN th.avg_hrv IS NOT NULL AND bs.baseline_hrv IS NOT NULL THEN
th.avg_hrv - bs.baseline_hrv
ELSE NULL
END as hrv_deviation,
CASE
WHEN th.avg_hrv IS NOT NULL AND bs.baseline_hrv IS NOT NULL AND bs.baseline_hrv > 0 THEN
((th.avg_hrv - bs.baseline_hrv) / bs.baseline_hrv) * 100
ELSE NULL
END as hrv_percent_change
FROM today_hrv_summary th
CROSS JOIN baseline_stats bs
),
hrv_recovery_score AS (
SELECT
*,
CASE
WHEN z_score IS NULL THEN NULL
WHEN baseline_days_count < 7 THEN 75
WHEN z_score >= 1.0 THEN 100
WHEN z_score >= 0.5 THEN 85 + ((z_score - 0.5) * 30)
WHEN z_score >= 0 THEN 70 + (z_score * 30)
WHEN z_score >= -0.5 THEN 50 + ((z_score + 0.5) * 40)
WHEN z_score >= -1.0 THEN 25 + ((z_score + 1.0) * 50)
ELSE 25
END as recovery_score,
CASE
WHEN z_score IS NULL THEN 'No Data'
WHEN baseline_days_count < 7 THEN 'Building Baseline'
WHEN z_score >= 0.5 THEN 'High Recovery'
WHEN z_score >= -0.5 THEN 'Normal Recovery'
ELSE 'Low Recovery'
END as recovery_status,
CASE
WHEN z_score IS NULL THEN 'No HRV data available for today'
WHEN baseline_days_count < 7 THEN
'Building your baseline (' || baseline_days_count || ' of 30 days)'
WHEN z_score >= 1.0 THEN
'Outstanding recovery! Your HRV is significantly above baseline (+' ||
ROUND(hrv_percent_change, 1) || '%)'
WHEN z_score >= 0.5 THEN
'Excellent recovery. Your body is well-rested (+' ||
ROUND(hrv_percent_change, 1) || '%)'
WHEN z_score >= 0 THEN
'Good recovery. You are ready for normal activities'
WHEN z_score >= -0.5 THEN
'Normal recovery. Your body is functioning well'
WHEN z_score >= -1.0 THEN
'Below average recovery. Consider lighter activities today'
ELSE
'Low recovery. Your body needs rest. Focus on recovery today'
END as recovery_message,
CASE
WHEN z_score IS NULL THEN 'Sync your ring for activity guidance'
WHEN baseline_days_count < 7 THEN 'Normal activities are fine'
WHEN z_score >= 0.5 THEN 'Perfect day for intense training or challenging work'
WHEN z_score >= 0 THEN 'Good day for moderate exercise and productive work'
WHEN z_score >= -0.5 THEN 'Stick to light to moderate activities'
ELSE 'Prioritize rest, recovery, and light movement only'
END as activity_recommendation
FROM hrv_recovery_calculation
)
SELECT
dp.target_date_local as date,
ROUND(hrs.today_hrv, 1) as hrv_ms,
hrs.today_readings as hrv_reading_count,
ROUND(hrs.today_min, 1) as hrv_min,
ROUND(hrs.today_max, 1) as hrv_max,
ROUND(hrs.today_stddev, 1) as hrv_stddev,
ROUND(hrs.today_stress, 1) as stress_level,
ROUND(hrs.today_heart_rate, 1) as avg_heart_rate,
ROUND(hrs.baseline_hrv, 1) as baseline_hrv_30day,
ROUND(hrs.baseline_stddev, 1) as baseline_stddev,
ROUND(hrs.baseline_min, 1) as baseline_min,
ROUND(hrs.baseline_max, 1) as baseline_max,
hrs.baseline_days_count as baseline_days,
ROUND(hrs.z_score, 2) as z_score,
ROUND(hrs.hrv_deviation, 1) as hrv_change_ms,
ROUND(hrs.hrv_percent_change, 1) as hrv_change_percent,
hrs.recovery_status as status,
ROUND(hrs.recovery_score, 0) as recovery_component_score,
hrs.recovery_message as message,
hrs.activity_recommendation as recommendation,
CASE
WHEN hrs.today_readings >= 12 THEN 'High'
WHEN hrs.today_readings >= 6 THEN 'Medium'
WHEN hrs.today_readings >= 3 THEN 'Low'
ELSE 'Very Low'
END as data_quality,
now() as calculated_at_utc
FROM date_params dp
CROSS JOIN hrv_recovery_score hrs;
"#;
let hrv_score_result = query("zivaring", rhr_hrv_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Resting Heart Rate Result: {} status: {} (Time taken: {:.3} seconds)",
hrv_score_result["json_value"],
hrv_score_result["status"],
duration.as_secs_f64()
);
}
Ok(())
}
#[allow(dead_code)]
async fn test_rhr_queries() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== RHR QUERIES FOR HEART HEALTH COMPONENT ===");
const STORAGE_PATH: &str = "tmp";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing RHR queries for user: {} ===", username);
let start_time = Instant::now();
let rhr_sql_query = r#"
WITH date_params AS (
SELECT
to_timestamp('2025-09-01T00:00:00') AS target_date_utc,
'America/Los_Angeles' AS user_timezone,
to_timestamp('2025-09-01T01:00:00') AS sleep_window_start_utc,
to_timestamp('2025-09-01T13:00:00') AS sleep_window_end_utc,
to_timestamp('2025-09-01T07:00:00') AS day_start_utc,
to_timestamp('2025-10-01T07:00:00') AS day_end_utc
),
-- sleep timestamps (epoch seconds as BIGINT + timestamp)
sleep_timestamps AS (
SELECT
CAST(s.date AS BIGINT) AS sleep_epoch,
to_timestamp_seconds(CAST(s.date AS BIGINT)) AS sleep_timestamp_utc
FROM sleep s
CROSS JOIN date_params dp
WHERE to_timestamp_seconds(CAST(s.date AS BIGINT))
BETWEEN dp.sleep_window_start_utc AND dp.sleep_window_end_utc
),
-- heart rate rows that align to a sleep timestamp within +/-30s
hr_during_sleep AS (
SELECT
hr."singleHR" AS heart_rate,
CAST(hr.date AS BIGINT) AS hr_epoch,
to_timestamp_seconds(CAST(hr.date AS BIGINT)) AS hr_timestamp_utc
FROM heartrate hr
WHERE hr."singleHR" BETWEEN 40 AND 120
AND EXISTS (
SELECT 1
FROM sleep_timestamps st
WHERE ABS(CAST(hr.date AS BIGINT) - st.sleep_epoch) <= 30
)
),
-- SAFE pattern: only compute aggregates if count > 0
sleep_based_rhr AS (
SELECT
COUNT(*) AS reading_count,
CASE WHEN COUNT(*) > 0 THEN APPROX_PERCENTILE_CONT(heart_rate, 0.2) ELSE NULL END AS rhr_value,
CASE WHEN COUNT(*) > 0 THEN AVG(heart_rate) ELSE NULL END AS avg_hr,
CASE WHEN COUNT(*) > 0 THEN MIN(heart_rate) ELSE NULL END AS min_hr,
CASE WHEN COUNT(*) > 0 THEN MAX(heart_rate) ELSE NULL END AS max_hr,
'sleep_based' AS rhr_source
FROM hr_during_sleep
),
-- all-day hr rows (filtered)
hr_all_day AS (
SELECT
hr."singleHR" AS heart_rate
FROM heartrate hr
WHERE hr."singleHR" BETWEEN 40 AND 120
AND to_timestamp_seconds(CAST(hr.date AS BIGINT))
BETWEEN (SELECT day_start_utc FROM date_params)
AND (SELECT day_end_utc FROM date_params)
),
all_day_rhr AS (
SELECT
COUNT(*) AS reading_count,
CASE WHEN COUNT(*) > 0 THEN APPROX_PERCENTILE_CONT(heart_rate, 0.2) ELSE NULL END AS rhr_value,
CASE WHEN COUNT(*) > 0 THEN AVG(heart_rate) ELSE NULL END AS avg_hr,
CASE WHEN COUNT(*) > 0 THEN MIN(heart_rate) ELSE NULL END AS min_hr,
CASE WHEN COUNT(*) > 0 THEN MAX(heart_rate) ELSE NULL END AS max_hr,
'all_day_fallback' AS rhr_source
FROM hr_all_day
),
today_rhr AS (
SELECT
COALESCE(
CASE WHEN sb.reading_count >= 10 THEN sb.rhr_value END,
CASE WHEN ad.reading_count >= 10 THEN ad.rhr_value END
) AS rhr_value,
COALESCE(
CASE WHEN sb.reading_count >= 10 THEN sb.reading_count END,
CASE WHEN ad.reading_count >= 10 THEN ad.reading_count END,
0
) AS reading_count,
COALESCE(
CASE WHEN sb.reading_count >= 10 THEN sb.rhr_source END,
CASE WHEN ad.reading_count >= 10 THEN ad.rhr_source END,
'no_data'
) AS rhr_source,
COALESCE(
CASE WHEN sb.reading_count >= 10 THEN sb.avg_hr END,
CASE WHEN ad.reading_count >= 10 THEN ad.avg_hr END
) AS avg_hr,
COALESCE(
CASE WHEN sb.reading_count >= 10 THEN sb.min_hr END,
CASE WHEN ad.reading_count >= 10 THEN ad.min_hr END
) AS min_hr,
COALESCE(
CASE WHEN sb.reading_count >= 10 THEN sb.max_hr END,
CASE WHEN ad.reading_count >= 10 THEN ad.max_hr END
) AS max_hr,
sb.reading_count AS sleep_reading_count,
sb.rhr_value AS sleep_rhr_value,
ad.reading_count AS all_day_reading_count,
ad.rhr_value AS all_day_rhr_value
FROM sleep_based_rhr sb
CROSS JOIN all_day_rhr ad
),
-- baseline per day: compute daily counts + percentile safely per day
baseline_rhr_data AS (
SELECT
DATE_TRUNC('day', to_timestamp_seconds(CAST(hr.date AS BIGINT))) AS day_utc,
COUNT(hr."singleHR") AS cnt,
CASE WHEN COUNT(hr."singleHR") > 0
THEN APPROX_PERCENTILE_CONT(hr."singleHR", 0.2)
ELSE NULL END AS daily_rhr
FROM heartrate hr
WHERE to_timestamp_seconds(CAST(hr.date AS BIGINT))
BETWEEN to_timestamp('2025-09-23T07:00:00') AND to_timestamp('2025-09-01T06:59:59')
AND hr."singleHR" BETWEEN 40 AND 120
GROUP BY DATE_TRUNC('day', to_timestamp_seconds(CAST(hr.date AS BIGINT)))
),
baseline_rhr AS (
SELECT
AVG(daily_rhr) AS baseline_rhr_value,
STDDEV(daily_rhr) AS baseline_rhr_stddev,
COUNT(*) AS baseline_days_count
FROM baseline_rhr_data
),
rhr_analysis AS (
SELECT
tr.rhr_value AS today_rhr,
tr.reading_count,
tr.rhr_source,
tr.avg_hr,
tr.min_hr,
tr.max_hr,
tr.sleep_reading_count,
tr.sleep_rhr_value,
tr.all_day_reading_count,
tr.all_day_rhr_value,
br.baseline_rhr_value,
br.baseline_rhr_stddev,
br.baseline_days_count,
ROUND(tr.rhr_value - br.baseline_rhr_value, 1) AS rhr_change_bpm,
CASE
WHEN tr.rhr_value IS NULL THEN 'No Data'
WHEN br.baseline_days_count < 3 THEN 'Building Baseline'
WHEN (tr.rhr_value - br.baseline_rhr_value) <= -3 THEN 'Improving'
WHEN ABS(tr.rhr_value - br.baseline_rhr_value) <= 3 THEN 'Stable'
WHEN (tr.rhr_value - br.baseline_rhr_value) <= 5 THEN 'Slightly Elevated'
WHEN (tr.rhr_value - br.baseline_rhr_value) <= 10 THEN 'Elevated'
ELSE 'High'
END AS rhr_status,
CASE
WHEN tr.rhr_value IS NULL THEN NULL
WHEN br.baseline_days_count < 3 THEN 75
WHEN (tr.rhr_value - br.baseline_rhr_value) <= -3 THEN 100
WHEN ABS(tr.rhr_value - br.baseline_rhr_value) <= 3 THEN 85
WHEN (tr.rhr_value - br.baseline_rhr_value) <= 5 THEN 70
WHEN (tr.rhr_value - br.baseline_rhr_value) <= 10 THEN 50
ELSE 30
END AS rhr_score
FROM today_rhr tr
CROSS JOIN baseline_rhr br
)
SELECT
dp.target_date_utc AS date,
ROUND(ra.today_rhr, 1) AS resting_heart_rate_bpm,
ra.rhr_source AS calculation_method,
ra.reading_count AS hr_readings_used,
ROUND(ra.avg_hr, 1) AS average_heart_rate,
ROUND(ra.min_hr, 1) AS minimum_heart_rate,
ROUND(ra.max_hr, 1) AS maximum_heart_rate,
ROUND(ra.sleep_rhr_value, 1) AS sleep_based_rhr,
ra.sleep_reading_count AS sleep_hr_count,
ROUND(ra.all_day_rhr_value, 1) AS all_day_rhr,
ra.all_day_reading_count AS all_day_hr_count,
ROUND(ra.baseline_rhr_value, 1) AS baseline_rhr_7day,
ROUND(ra.baseline_rhr_stddev, 1) AS baseline_stddev,
ra.baseline_days_count AS baseline_days,
ra.rhr_change_bpm AS rhr_change,
ra.rhr_status AS status,
ra.rhr_score AS rhr_component_score,
dp.sleep_window_start_utc,
dp.sleep_window_end_utc,
now() AS calculated_at_utc
FROM date_params dp
CROSS JOIN rhr_analysis ra;
"#;
let rhr_score_result = query("zivaring", rhr_sql_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"Resting Heart Rate Result: {} status: {} (Time taken: {:.3} seconds)",
rhr_score_result["json_value"],
rhr_score_result["status"],
duration.as_secs_f64()
);
}
Ok(())
}
#[allow(dead_code)]
async fn test_vitality_queries() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== VITALITY QUERIES COMPONENT ===");
const STORAGE_PATH: &str = "tmp";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing Vitality queries for user: {} ===", username);
let start_time = Instant::now();
let vitality_sql_query = r#"
WITH date_params AS (
SELECT
to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC' as now_utc,
DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') as today_utc,
DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '1 day' as yesterday_utc,
DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '7 days' as week_ago,
DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '14 days' as two_weeks_ago,
DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '30 days' as month_ago,
-- Sleep window: yesterday 6pm to today 6pm
(DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '1 day') + INTERVAL '18 hours' as sleep_window_start,
DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') + INTERVAL '18 hours' as sleep_window_end,
65 as user_age
),
hrv_component AS (
WITH hrv_data_availability AS (
SELECT
COUNT(DISTINCT DATE_TRUNC('day', TO_TIMESTAMP(date))) as days_available
FROM hrv_table
WHERE TO_TIMESTAMP(date) >= (SELECT month_ago FROM date_params)
AND hrv BETWEEN 1 AND 200
),
progressive_baseline AS (
SELECT
da.days_available,
CASE
WHEN da.days_available >= 30 THEN 30
WHEN da.days_available >= 14 THEN da.days_available
WHEN da.days_available >= 7 THEN da.days_available
WHEN da.days_available >= 3 THEN da.days_available
WHEN da.days_available >= 1 THEN da.days_available
ELSE 0
END as baseline_days,
CASE
WHEN da.days_available >= 30 THEN 1.0
WHEN da.days_available >= 14 THEN 0.9
WHEN da.days_available >= 7 THEN 0.75
WHEN da.days_available >= 3 THEN 0.5
WHEN da.days_available >= 1 THEN 0.3
ELSE 0.0
END as confidence_factor,
CASE
WHEN da.days_available >= 30 THEN 'Your recovery'
WHEN da.days_available >= 14 THEN 'Your recovery (personalizing)'
WHEN da.days_available >= 7 THEN 'Recovery (learning your pattern)'
WHEN da.days_available >= 3 THEN 'Early recovery data'
WHEN da.days_available >= 1 THEN 'Recovery: Calculating...'
ELSE 'No recovery data'
END as confidence_message
FROM hrv_data_availability da
),
baseline_stats AS (
SELECT
pb.baseline_days,
pb.confidence_factor,
pb.confidence_message,
AVG(h.hrv) as baseline_mean,
STDDEV(h.hrv) as baseline_stddev
FROM hrv_table h
CROSS JOIN progressive_baseline pb
WHERE TO_TIMESTAMP(h.date) >=
(SELECT today_utc FROM date_params) - INTERVAL '30' day
AND h.hrv BETWEEN 1 AND 200
GROUP BY pb.baseline_days, pb.confidence_factor, pb.confidence_message
),
today_hrv AS (
SELECT
AVG(hrv) as today_mean,
AVG(stress) as today_stress,
COUNT(*) as readings
FROM hrv_table
WHERE DATE_TRUNC('day', TO_TIMESTAMP(date)) =
(SELECT today_utc FROM date_params)
AND hrv BETWEEN 1 AND 200
)
SELECT
t.today_mean as current_hrv,
t.readings as hrv_readings,
b.baseline_mean,
b.baseline_stddev,
b.confidence_factor,
b.confidence_message,
t.today_stress,
CASE
WHEN b.baseline_stddev > 0 AND t.today_mean IS NOT NULL THEN
((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor
ELSE 0.0
END as adjusted_z_score,
CASE
WHEN b.baseline_stddev = 0 OR t.today_mean IS NULL THEN NULL
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 1.0
THEN 100.0
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.5
THEN 85.0 + (((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor - 0.5) * 30.0
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.0
THEN 70.0 + ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor * 30.0
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -0.5
THEN 50.0 + (((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor + 0.5) * 40.0
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -1.0
THEN 25.0 + (((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor + 1.0) * 50.0
ELSE 25.0
END as recovery_score_100,
CASE
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 1.0
THEN 'Excellent'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.5
THEN 'Great'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.0
THEN 'Good'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -0.5
THEN 'Fair'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -1.0
THEN 'Low'
ELSE 'Poor'
END as recovery_status,
CASE
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 1.0
THEN 'Your body is fully recovered'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.5
THEN 'Strong recovery, ready for activity'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.0
THEN 'Normal recovery level'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -0.5
THEN 'Adequate recovery'
WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -1.0
THEN 'Below normal recovery'
ELSE 'Prioritize rest and recovery'
END as recovery_message
FROM today_hrv t
CROSS JOIN baseline_stats b
),
sleep_quality_component AS (
WITH last_night_sleep AS (
SELECT
COUNT(*) as total_minutes,
COUNT(DISTINCT start) as session_count,
SUM(CASE WHEN quality = 3 THEN 1 ELSE 0 END) as deep_minutes,
SUM(CASE WHEN quality = 5 THEN 1 ELSE 0 END) as rem_minutes,
SUM(CASE WHEN quality = 2 THEN 1 ELSE 0 END) as light_minutes,
SUM(CASE WHEN quality = 1 THEN 1 ELSE 0 END) as awake_minutes
FROM sleep
WHERE TO_TIMESTAMP(date) >=
(SELECT sleep_window_start FROM date_params)
AND TO_TIMESTAMP(date) <
(SELECT sleep_window_end FROM date_params)
),
sleep_scoring AS (
SELECT
ls.*,
dp.user_age,
CASE
WHEN dp.user_age >= 65 THEN 420.0
WHEN dp.user_age >= 50 THEN 390.0
ELSE 420.0
END as min_optimal,
CASE
WHEN dp.user_age >= 65 THEN 540.0
WHEN dp.user_age >= 50 THEN 480.0
ELSE 510.0
END as max_optimal
FROM last_night_sleep ls
CROSS JOIN date_params dp
),
sleep_calculation AS (
SELECT
total_minutes,
session_count,
deep_minutes,
rem_minutes,
light_minutes,
awake_minutes,
-- Duration scoring
CASE
WHEN total_minutes BETWEEN min_optimal AND max_optimal THEN 100.0
WHEN total_minutes < min_optimal THEN
GREATEST(25.0, 100.0 - ((min_optimal - total_minutes) * 0.5))
WHEN total_minutes <= 600 THEN 90.0
ELSE 75.0
END as duration_score,
-- Continuity scoring
CASE
WHEN session_count = 1 THEN 100.0
WHEN session_count = 2 THEN 85.0
WHEN session_count <= 4 THEN 70.0
WHEN session_count <= 6 THEN 55.0
ELSE 40.0
END as continuity_score,
-- Quality based on sleep architecture
CASE
WHEN total_minutes = 0 THEN 0.0
WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.25 THEN 100.0
WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.20 THEN 85.0
WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.15 THEN 70.0
WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.10 THEN 55.0
ELSE 40.0
END as architecture_score,
CASE
WHEN total_minutes BETWEEN min_optimal AND max_optimal THEN 'Perfect sleep duration'
WHEN total_minutes < min_optimal - 60 THEN 'Significantly under-slept'
WHEN total_minutes < min_optimal THEN 'Less sleep than ideal'
WHEN total_minutes > 600 THEN 'Very long sleep - check energy levels'
WHEN total_minutes > max_optimal THEN 'Extended sleep detected'
ELSE 'Good sleep duration'
END as duration_message,
CASE
WHEN session_count = 1 THEN 'Excellent sleep continuity'
WHEN session_count = 2 THEN 'Minimal interruptions'
WHEN session_count <= 4 THEN 'Some sleep fragmentation'
WHEN session_count <= 6 THEN 'Fragmented sleep'
ELSE 'Highly fragmented sleep'
END as continuity_message,
CASE
WHEN total_minutes = 0 THEN 'No sleep data'
WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.20
THEN CONCAT('Good restorative sleep (',
CAST(ROUND((deep_minutes + rem_minutes) * 100.0 / total_minutes, 0) AS VARCHAR),
'% deep+REM)')
ELSE CONCAT('Limited restorative sleep (',
CAST(ROUND((deep_minutes + rem_minutes) * 100.0 / total_minutes, 0) AS VARCHAR),
'% deep+REM)')
END as architecture_message,
CASE
WHEN total_minutes < min_optimal - 120 THEN 'very_short_sleep'
WHEN total_minutes < min_optimal - 60 THEN 'short_sleep'
WHEN total_minutes > 720 THEN 'excessive_sleep'
WHEN total_minutes > 600 THEN 'oversleep'
ELSE NULL
END as sleep_alert
FROM sleep_scoring
)
SELECT
total_minutes,
session_count,
deep_minutes,
rem_minutes,
light_minutes,
awake_minutes,
duration_score,
continuity_score,
architecture_score,
duration_message,
continuity_message,
architecture_message,
sleep_alert,
(duration_score * 0.4 + continuity_score * 0.3 + architecture_score * 0.3) as sleep_quality_score_100
FROM sleep_calculation
),
sleep_consistency_component AS (
WITH previous_nights AS (
SELECT
DATE_TRUNC('day', TO_TIMESTAMP(date)) as sleep_date,
COUNT(*) as night_minutes,
COUNT(DISTINCT start) as sessions
FROM sleep
WHERE TO_TIMESTAMP(date) >=
(SELECT week_ago FROM date_params)
AND TO_TIMESTAMP(date) <
(SELECT sleep_window_start FROM date_params)
GROUP BY DATE_TRUNC('day', TO_TIMESTAMP(date))
),
consistency_stats AS (
SELECT
COUNT(*) as nights_count,
AVG(night_minutes) as avg_duration,
STDDEV(night_minutes) as duration_variance
FROM previous_nights
),
consistency_scoring AS (
SELECT
*,
CASE
WHEN avg_duration BETWEEN 420 AND 480 THEN 100.0
WHEN avg_duration BETWEEN 360 AND 540 THEN 80.0
ELSE 60.0
END as duration_score,
CASE
WHEN duration_variance <= 30 THEN 100.0
WHEN duration_variance <= 60 THEN 75.0
ELSE 50.0
END as variance_score,
CASE
WHEN duration_variance <= 30
THEN 'Excellent sleep consistency this week'
WHEN duration_variance <= 60
THEN 'Good sleep routine maintained'
WHEN duration_variance <= 90
THEN 'Sleep schedule somewhat variable'
ELSE 'Irregular sleep pattern - try consistent bedtime'
END as consistency_message
FROM consistency_stats
)
SELECT
nights_count,
avg_duration,
duration_variance,
consistency_message,
CASE
WHEN nights_count >= 3 THEN
(duration_score * 0.5 + variance_score * 0.5)
ELSE NULL
END as consistency_score_100,
CASE
WHEN nights_count < 3 THEN 'building_sleep_patterns'
ELSE NULL
END as consistency_alert
FROM consistency_scoring
),
rhr_component AS (
WITH daily_hr_readings AS (
SELECT
DATE_TRUNC('day', TO_TIMESTAMP(date)) as hr_date,
"singleHR"
FROM heartrate
WHERE TO_TIMESTAMP(date) >=
(SELECT week_ago FROM date_params)
AND "singleHR" BETWEEN 40 AND 120
),
daily_rhr AS (
SELECT
hr_date,
APPROX_PERCENTILE_CONT("singleHR", 0.2) as rhr_20th
FROM daily_hr_readings
GROUP BY hr_date
),
rhr_analysis AS (
SELECT
AVG(CASE WHEN hr_date < (SELECT today_utc FROM date_params)
THEN rhr_20th END) as baseline_rhr,
MAX(CASE WHEN hr_date = (SELECT today_utc FROM date_params)
THEN rhr_20th END) as today_rhr
FROM daily_rhr
),
rhr_scoring AS (
SELECT
baseline_rhr,
today_rhr,
today_rhr - baseline_rhr as rhr_change,
CASE
WHEN today_rhr - baseline_rhr <= -3 THEN 100.0
WHEN today_rhr - baseline_rhr <= 3 THEN 85.0
WHEN today_rhr - baseline_rhr <= 5 THEN 70.0
WHEN today_rhr - baseline_rhr <= 10 THEN 50.0
ELSE 30.0
END as rhr_score_100,
CASE
WHEN today_rhr - baseline_rhr <= -3 THEN 'Improving'
WHEN today_rhr - baseline_rhr <= 3 THEN 'Stable'
WHEN today_rhr - baseline_rhr <= 5 THEN 'Slightly Elevated'
WHEN today_rhr - baseline_rhr <= 10 THEN 'Elevated'
ELSE 'High'
END as rhr_status,
CASE
WHEN today_rhr - baseline_rhr <= -3 THEN 'Heart rate improving'
WHEN today_rhr - baseline_rhr <= 3 THEN 'Heart rate stable'
WHEN today_rhr - baseline_rhr <= 5 THEN 'Slightly elevated heart rate'
WHEN today_rhr - baseline_rhr <= 10 THEN 'Heart rate elevated - monitor'
ELSE 'Significant heart rate elevation'
END as rhr_message,
CASE
WHEN today_rhr - baseline_rhr > 10 THEN 'rhr_high_alert'
WHEN today_rhr - baseline_rhr > 5 THEN 'rhr_elevated'
ELSE NULL
END as rhr_alert
FROM rhr_analysis
)
SELECT * FROM rhr_scoring
),
safety_modifiers AS (
WITH temp_analysis AS (
SELECT
AVG(CASE WHEN DATE_TRUNC('day', TO_TIMESTAMP(date)) =
(SELECT today_utc FROM date_params)
THEN temperature END) as today_temp,
AVG(CASE WHEN DATE_TRUNC('day', TO_TIMESTAMP(date)) <
(SELECT today_utc FROM date_params)
THEN temperature END) as baseline_temp
FROM temperature_readings
WHERE TO_TIMESTAMP(date) >=
(SELECT two_weeks_ago FROM date_params)
AND temperature BETWEEN 30 AND 40
),
temp_scoring AS (
SELECT
today_temp - baseline_temp as temp_elevation,
CASE
WHEN today_temp - baseline_temp > 1.5 THEN 50.0
WHEN today_temp - baseline_temp > 1.0 THEN 70.0
WHEN today_temp - baseline_temp > 0.5 THEN 85.0
ELSE 100.0
END as temp_cap,
CASE
WHEN today_temp - baseline_temp > 1.5 THEN 'critical'
WHEN today_temp - baseline_temp > 1.0 THEN 'high'
WHEN today_temp - baseline_temp > 0.5 THEN 'medium'
ELSE NULL
END as temp_alert_level,
CASE
WHEN today_temp - baseline_temp > 1.5
THEN 'Temperature elevated - possible illness'
WHEN today_temp - baseline_temp > 1.0
THEN 'Slight temperature elevation'
WHEN today_temp - baseline_temp > 0.5
THEN 'Minor temperature elevation'
ELSE NULL
END as temp_message
FROM temp_analysis
),
spo2_analysis AS (
SELECT
AVG("automaticSpo2Data") as overnight_avg,
COUNT(CASE WHEN "automaticSpo2Data" < 90 THEN 1 END) as dips_below_90
FROM spo2
WHERE DATE_TRUNC('day', TO_TIMESTAMP(date)) =
(SELECT today_utc FROM date_params)
AND "automaticSpo2Data" BETWEEN 70 AND 100
),
spo2_scoring AS (
SELECT
overnight_avg,
dips_below_90,
CASE
WHEN overnight_avg < 88 THEN 40.0
WHEN overnight_avg < 92 THEN 60.0
WHEN dips_below_90 > 10 THEN 70.0
ELSE 100.0
END as spo2_cap,
CASE
WHEN overnight_avg < 88 THEN 'critical'
WHEN overnight_avg < 92 THEN 'high'
WHEN dips_below_90 > 10 THEN 'medium'
ELSE NULL
END as spo2_alert_level,
CASE
WHEN overnight_avg < 88
THEN 'Low oxygen levels detected'
WHEN overnight_avg < 92
THEN 'Oxygen levels below normal'
WHEN dips_below_90 > 10
THEN 'Multiple oxygen dips during sleep'
ELSE NULL
END as spo2_message
FROM spo2_analysis
)
SELECT
t.temp_elevation,
t.temp_cap,
t.temp_alert_level,
t.temp_message,
s.overnight_avg as spo2_avg,
s.dips_below_90,
s.spo2_cap,
s.spo2_alert_level,
s.spo2_message
FROM temp_scoring t
CROSS JOIN spo2_scoring s
),
vitality_calculation AS (
SELECT
COALESCE(h.recovery_score_100, 0.0) as recovery_score,
COALESCE(sq.sleep_quality_score_100, 0.0) as sleep_score,
COALESCE(sc.consistency_score_100, 0.0) as consistency_score,
COALESCE(r.rhr_score_100, 0.0) as rhr_score,
CASE WHEN h.recovery_score_100 IS NOT NULL THEN 0.35 ELSE 0.0 END +
CASE WHEN sq.sleep_quality_score_100 IS NOT NULL THEN 0.30 ELSE 0.0 END +
CASE WHEN sc.consistency_score_100 IS NOT NULL THEN 0.20 ELSE 0.0 END +
CASE WHEN r.rhr_score_100 IS NOT NULL THEN 0.15 ELSE 0.0 END as total_weight,
-- Sleep details
sq.total_minutes as sleep_minutes,
sq.session_count as sleep_sessions,
sq.deep_minutes,
sq.rem_minutes,
sq.light_minutes,
sq.awake_minutes,
-- HRV details
h.hrv_readings,
h.current_hrv,
h.baseline_mean as baseline_hrv,
h.adjusted_z_score as hrv_z_score,
-- RHR details
r.today_rhr,
r.baseline_rhr,
r.rhr_change,
-- Component messages
h.recovery_status,
h.recovery_message,
h.confidence_message,
sq.duration_message,
sq.continuity_message,
sq.architecture_message,
sq.sleep_alert,
sc.consistency_message,
sc.consistency_alert,
r.rhr_status,
r.rhr_message,
r.rhr_alert,
-- Safety
sm.temp_cap,
sm.temp_alert_level,
sm.temp_message,
sm.spo2_cap,
sm.spo2_alert_level,
sm.spo2_message,
75.0 as yesterday_score
FROM hrv_component h
CROSS JOIN sleep_quality_component sq
LEFT JOIN sleep_consistency_component sc ON true
LEFT JOIN rhr_component r ON true
CROSS JOIN safety_modifiers sm
)
SELECT
-- Vitality Score
CAST(LEAST(
CASE
WHEN total_weight > 0 THEN
ROUND((
recovery_score * 0.35 +
sleep_score * 0.30 +
consistency_score * 0.20 +
rhr_score * 0.15
) / total_weight * 100.0)
ELSE NULL
END,
temp_cap,
spo2_cap
) AS INTEGER) as vitality_score,
CASE
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 85 THEN 'Excellent'
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 70 THEN 'Good'
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 55 THEN 'Fair'
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 40 THEN 'Low'
ELSE 'Poor'
END as vitality_category,
CASE
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 85 THEN 'Excellent vitality!'
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 70 THEN 'Good energy today'
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 55 THEN 'Moderate vitality'
WHEN LEAST(
ROUND((recovery_score * 0.35 + sleep_score * 0.30 +
consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
temp_cap, spo2_cap) >= 40 THEN 'Low energy today'
ELSE 'Focus on recovery'
END as primary_message,
-- Component Breakdown
CAST(ROUND(recovery_score, 1) AS DOUBLE) as recovery_score,
CAST(ROUND(recovery_score * 0.35, 1) AS DOUBLE) as recovery_points,
CAST(ROUND(sleep_score, 1) AS DOUBLE) as sleep_score,
CAST(ROUND(sleep_score * 0.30, 1) AS DOUBLE) as sleep_points,
CAST(ROUND(consistency_score, 1) AS DOUBLE) as consistency_score,
CAST(ROUND(consistency_score * 0.20, 1) AS DOUBLE) as consistency_points,
CAST(ROUND(rhr_score, 1) AS DOUBLE) as rhr_score,
CAST(ROUND(rhr_score * 0.15, 1) AS DOUBLE) as rhr_points,
-- Sleep Details
sleep_minutes,
CAST(ROUND(sleep_minutes / 60.0, 1) AS DOUBLE) as sleep_hours,
sleep_sessions,
deep_minutes,
rem_minutes,
light_minutes,
awake_minutes,
CASE WHEN sleep_minutes > 0
THEN CAST(ROUND((deep_minutes + rem_minutes) * 100.0 / sleep_minutes, 1) AS DOUBLE)
ELSE 0.0
END as restorative_pct,
-- HRV Details
hrv_readings,
CAST(ROUND(current_hrv, 1) AS DOUBLE) as current_hrv,
CAST(ROUND(baseline_hrv, 1) AS DOUBLE) as baseline_hrv,
CAST(ROUND(hrv_z_score, 2) AS DOUBLE) as hrv_z_score,
-- RHR Details
CAST(ROUND(today_rhr, 1) AS DOUBLE) as today_rhr,
CAST(ROUND(baseline_rhr, 1) AS DOUBLE) as baseline_rhr,
CAST(ROUND(rhr_change, 1) AS DOUBLE) as rhr_change,
-- Messages
recovery_status,
recovery_message,
confidence_message,
duration_message as sleep_duration_message,
continuity_message as sleep_continuity_message,
architecture_message as sleep_architecture_message,
consistency_message,
rhr_status,
rhr_message,
-- Safety
temp_message,
spo2_message,
-- Metadata
CAST(ROUND(total_weight * 100, 0) AS INTEGER) as data_completeness_pct,
CURRENT_TIMESTAMP AT TIME ZONE 'UTC' as calculated_at_utc
FROM vitality_calculation;
"#;
let vitality_result = query("zivaring", vitality_sql_query, None, None).await?;
let duration = start_time.elapsed();
println!(
"vitality Result: {} status: {} (Time taken: {:.3} seconds)",
vitality_result["json_value"],
vitality_result["status"],
duration.as_secs_f64()
);
}
Ok(())
}
#[allow(dead_code)]
async fn ziva_app_queries() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== ZIVA APP QUERIES COMPONENT ===");
const STORAGE_PATH: &str = "tmp";
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();
for username in &usernames {
println!("\n=== Testing Ziva App queries for user: {} ===", username);
let start_time = Instant::now();
let query_x = r#"
WITH transformed AS (
SELECT
step,
calories,
TO_CHAR(TO_TIMESTAMP(date)::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles', '%Y.%m.%d %H:%M:%S') AS date,
TO_CHAR(TO_TIMESTAMP(date)::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles', '%Y.%m.%d') AS day,
TO_CHAR(TO_TIMESTAMP(date)::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles', '%H') AS hour
FROM activitydetails
WHERE date BETWEEN '1758499200' AND '1758585599'
)
SELECT
day,
hour,
MAX(date) AS date,
SUM(calories) AS calories,
SUM(step) AS value
FROM transformed
GROUP BY day, hour
ORDER BY day, hour;
"#;
let result_x = query("zivaring", &query_x, None, None).await?;
let duration = start_time.elapsed();
println!(
"result_x: {} status: {} (Time taken: {:.3} seconds)",
result_x["json_value"],
result_x["status"],
duration.as_secs_f64()
);
}
Ok(())
}
#[allow(dead_code)]
async fn ziva_username_query_matching() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== ZIVA USERNAME QUERY MATCHING COMPONENT ===");
const STORAGE_PATH: &str = "tmp";
const USERNAME: &str = "ahmed_test";
let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
let query_x = "SELECT COUNT(*) as full_counter FROM spo2;";
const USERNAME1: &str = "rADQkoFBr4Pks9Y2H_sriram";
const USERNAME2: &str = "7TQBn6aSe49wfnuox_roshann";
println!("\n--- Query with None (default path) ---");
let result_x = query("zivaring", &query_x, None, None).await?;
let count_none = result_x["json_value"]
.as_array()
.and_then(|arr| arr.get(0))
.and_then(|obj| obj.get("full_counter"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
println!("\n--- Query with User1 ({}) ---", USERNAME1);
let result_x1 = query("zivaring", &query_x, Some(USERNAME1), None).await?;
let count_user1 = result_x1["json_value"]
.as_array()
.and_then(|arr| arr.get(0))
.and_then(|obj| obj.get("full_counter"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
println!("\n--- Query with User2 ({}) ---", USERNAME2);
let result_x2 = query("zivaring", &query_x, Some(USERNAME2), None).await?;
let count_user2 = result_x2["json_value"]
.as_array()
.and_then(|arr| arr.get(0))
.and_then(|obj| obj.get("full_counter"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
println!("\n--- Summary ---");
println!(" None: {} rows", count_none);
println!(" User1: {} rows", count_user1);
println!(" User2: {} rows", count_user2);
if count_none == count_user1 && count_user1 == count_user2 {
println!("\n ⚠️ WARNING: All queries returned the same count ({})!", count_none);
println!(" ⚠️ This suggests they're all querying the SAME path (likely the default path).");
} else {
println!("\n ✅ Different counts detected - queries are using different paths correctly.");
}
Ok(())
}
#[allow(dead_code)]
async fn check_ziva_fecth_time() -> Result<(), Box<dyn std::error::Error>> {
println!("\n=== CHECK ZIVA FETCH TIME COMPONENT ===");
const STORAGE_PATH: &str = "tmp";
const USERNAME: &str = "ahmed_test";
let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
init_bucket(
"https://s3.us-west-2.amazonaws.com",
"zivaoneapp",
"xxxxxxxxxx",
"xxxxxxxxxx",
"us-west-2",
)
.unwrap();
let start_time = Instant::now();
let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
let db_names = ["zivaring"];
let table_names = [
"activitydetails",
"battery_table",
"blood_glucose_table",
"heartrate",
"hrv_table",
"sleep",
"spo2",
"temperature_table",
];
let fetch_range = std::collections::HashMap::from([("start_date", "2025-01-01"), ("end_date", "2025-12-30")]);
let result = cloud_fetch_parquet_batch(&usernames, &db_names, &table_names, fetch_range).await;
let end_time = Instant::now();
let duration = end_time.duration_since(start_time);
match result {
Ok(value) => {
let json_value: serde_json::Value = serde_json::from_str(&value.to_string()).unwrap_or(json!({}));
let default_json = json!({});
let json_data = json_value.get("json_value").unwrap_or(&default_json);
let success_count = json_data.get("success_count").and_then(|v| v.as_u64()).unwrap_or(0);
let error_count = json_data.get("error_count").and_then(|v| v.as_u64()).unwrap_or(0);
let total_tasks = json_data.get("total_tasks").and_then(|v| v.as_u64()).unwrap_or(0);
let files_per_user = table_names.len();
let status_msg: String = if error_count == 0 {
"No synchronization needed (all local files current)".to_string()
} else {
format!("{} successful, {} failed", success_count, error_count)
};
println!(
"Cloud fetch performance:\nScope: {} users, {} tables, {} parquet files\nDistribution: {} ({} files), {} ({} files)\nExecution time: around {:.2} seconds\nStatus: {}",
usernames.len(),
table_names.len(),
total_tasks,
usernames[0],
files_per_user,
usernames[1],
files_per_user,
duration.as_secs_f64(),
status_msg
);
if let Some(errors) = json_data.get("errors") {
if errors.as_array().map(|a| !a.is_empty()).unwrap_or(false) {
println!("Errors: {:?}", errors);
}
}
}
Err(e) => {
eprintln!("Error in cloud_fetch_parquet_batch: {}", e);
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("cloud_fetch_parquet_batch failed: {}", e),
)));
}
}
Ok(())
}