tsdb_timon 1.1.3

Efficient local storage and Amazon S3-compatible data synchronization for time-series data, leveraging Parquet for storage and DataFusion for querying, all wrapped in a simple and intuitive API.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
mod timon_engine;
use chrono::{DateTime, Duration, Local, Utc};
use serde_json::json;
use std::time::Instant;
pub use timon_engine::{
  cloud_fetch_parquet, cloud_fetch_parquet_batch, cloud_sink_parquet, cloud_sync_parquet, create_database, create_table, delete_database,
  delete_table, init_bucket, init_timon, insert, list_databases, list_tables, query, query_df,
};
#[cfg(feature = "dev_cli")]
mod cli;
#[cfg(feature = "cloud_server")]
mod server;

#[cfg(feature = "cloud_server")]
mod cloud_server {
  use crate::server::timon_server;
  use std::io;

  #[actix_web::main]
  pub async fn main() -> io::Result<()> {
    timon_server().await
  }
}

#[cfg(feature = "dev_cli")]
mod dev_cli {
  use crate::cli::{convert_json_to_parquet, execute_query, Commands, CLI};
  use clap::Parser;

  #[tokio::main]
  pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let cli = CLI::parse();

    match &cli.command {
      Commands::Convert { input, output } => {
        convert_json_to_parquet(input.as_str(), output.as_str())?;
        println!("JSON converted to Parquet successfully.");
      }
      Commands::Query { file, query } => {
        execute_query(file.as_str(), query.as_str()).await?;
      }
    }
    Ok(())
  }
}

#[cfg(feature = "cloud_server")]
fn main() {
  if let Err(e) = cloud_server::main() {
    eprintln!("Failed to start the cloud server: {}", e);
  }
}

#[cfg(feature = "dev_cli")]
fn main() {
  if let Err(e) = dev_cli::main() {
    eprintln!("Failed to build the CLI tool: {}", e);
  }
}

#[allow(dead_code)]
async fn test_concurrent_inserts() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== TESTING CONCURRENT INSERTS TO SAME PARTITION ===");
  println!("This test will run multiple times to check for non-deterministic corruption...");

  let mut corruption_detected = false;

  let result = test_concurrent_inserts_single_run().await;
  match result {
    Ok(had_corruption) => {
      if had_corruption {
        corruption_detected = true;
        println!("\n🔴 CORRUPTION DETECTED");
      } else {
        println!("\n✅ No corruption");
      }
    }
    Err(e) => {
      eprintln!("\n❌ Error: {}", e);
    }
  }

  println!("\n{}", "=".repeat(60));
  println!("FINAL RESULTS: Corruption detected: {}", corruption_detected);
  if corruption_detected {
    println!("🔴 CONCURRENT INSERT CORRUPTION CONFIRMED!");
  } else {
    println!("✅ No corruption detected");
  }

  Ok(())
}

#[allow(dead_code)]
async fn test_concurrent_inserts_single_run() -> Result<bool, Box<dyn std::error::Error>> {
  use std::sync::Arc;
  use std::thread;

  println!("\n=== TESTING CONCURRENT INSERTS TO SAME PARTITION ===");

  const STORAGE_PATH: &str = "tmp_concurrent_test";
  const USERNAME: &str = "concurrent_test";
  const DATABASE_NAME: &str = "test_db";
  const TABLE_NAME: &str = "test_table";
  const BUCKET_INTERVAL: u32 = 43200; // Monthly partitioning

  // Clean up any existing test data - completely remove the storage directory
  println!("Cleaning up any existing test data...");
  // Remove the entire storage directory to avoid any metadata corruption issues
  if std::path::Path::new(STORAGE_PATH).exists() {
    if let Err(e) = std::fs::remove_dir_all(STORAGE_PATH) {
      eprintln!("Warning: Failed to remove storage directory: {}", e);
    }
  }

  // Small delay to ensure filesystem cleanup completes
  std::thread::sleep(std::time::Duration::from_millis(200));

  // Initialize timon with monthly partitioning
  println!("Initializing timon with monthly partitioning (43200)...");
  let _ = init_timon(STORAGE_PATH, BUCKET_INTERVAL, USERNAME).unwrap();
  let _ = create_database(DATABASE_NAME);

  // Create table schema
  let table_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "value": {
        "type": "int",
        "required": true
      },
      "thread_id": {
        "type": "int"
      }
    }
  "#;

  let _ = create_table(DATABASE_NAME, TABLE_NAME, &table_schema);
  println!("Table created successfully");

  // Ensure metadata is fully written to disk
  // Force a sync by reading it back
  std::thread::sleep(std::time::Duration::from_millis(200));

  // Verify metadata file exists and is readable
  let metadata_path = format!("{}/metadata.json", STORAGE_PATH);
  for attempt in 1..=5 {
    if let Ok(contents) = std::fs::read_to_string(&metadata_path) {
      if contents.contains(TABLE_NAME) && contents.contains(DATABASE_NAME) {
        println!("✅ Metadata file verified (attempt {})", attempt);
        break;
      } else {
        println!("⚠️  Metadata file exists but doesn't contain table (attempt {})", attempt);
      }
    } else {
      println!("⚠️  Metadata file not found yet (attempt {})", attempt);
    }
    std::thread::sleep(std::time::Duration::from_millis(100));
  }

  // Additional delay to ensure filesystem sync
  std::thread::sleep(std::time::Duration::from_millis(200));

  // Verify table exists before starting threads - try multiple times
  println!("Verifying table exists...");
  let mut verified = false;
  for attempt in 1..=5 {
    match list_tables(DATABASE_NAME) {
      Ok(tables_result) => {
        let tables_vec = tables_result["json_value"]
          .as_array()
          .and_then(|arr| Some(arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect::<Vec<_>>()))
          .unwrap_or_default();
        if tables_vec.contains(&TABLE_NAME.to_string()) {
          println!("✅ Table verified: {} (attempt {})", TABLE_NAME, attempt);
          verified = true;
          break;
        } else {
          println!("⚠️  Table not found yet, attempt {}... Available: {:?}", attempt, tables_vec);
        }
      }
      Err(e) => {
        println!("⚠️  Error listing tables, attempt {}: {}", attempt, e);
      }
    }
    std::thread::sleep(std::time::Duration::from_millis(100));
  }

  if !verified {
    return Err(format!("Table '{}' was not found after multiple attempts!", TABLE_NAME).into());
  }

  // Force a metadata reload by doing a dummy operation
  let _ = list_tables(DATABASE_NAME);
  std::thread::sleep(std::time::Duration::from_millis(100));

  // Number of threads and records per thread
  const NUM_THREADS: usize = 10;
  const RECORDS_PER_THREAD: usize = 50;
  const TOTAL_EXPECTED_RECORDS: usize = NUM_THREADS * RECORDS_PER_THREAD;

  println!(
    "\nSpawning {} threads, each inserting {} records to the same partition",
    NUM_THREADS, RECORDS_PER_THREAD
  );
  println!("Expected total records: {}", TOTAL_EXPECTED_RECORDS);

  // Track thread start/end times
  let start_time = std::time::Instant::now();

  // Use a base timestamp that will map to the same partition for all records
  let base_timestamp = chrono::Utc::now().timestamp();

  // Shared counters for tracking actual insertions
  let insert_attempts = Arc::new(std::sync::Mutex::new(0usize));
  let insert_successes = Arc::new(std::sync::Mutex::new(0usize));
  let insert_errors = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));

  // Spawn threads
  let mut handles = vec![];

  for thread_id in 0..NUM_THREADS {
    let insert_attempts_clone = Arc::clone(&insert_attempts);
    let insert_successes_clone = Arc::clone(&insert_successes);
    let insert_errors_clone = Arc::clone(&insert_errors);
    let base_timestamp_clone = base_timestamp;
    let bucket_interval = BUCKET_INTERVAL;
    let storage_path = STORAGE_PATH.to_string();
    let username = USERNAME.to_string();
    let db_name = DATABASE_NAME.to_string();
    let table_name = TABLE_NAME.to_string();

    let handle = thread::spawn(move || {
      // Each thread creates its own DatabaseManager instance
      use timon_engine::db_manager::DatabaseManager;

      // NO DELAY - all threads start simultaneously to test true concurrency
      let mut db_manager = DatabaseManager::new(
        &storage_path,
        bucket_interval, // Monthly partitioning
        &username,
      );

      // Generate records for this thread - each with unique date (required for unique constraint)
      // but all within the same month so they map to the same partition
      let mut records = Vec::new();
      for i in 0..RECORDS_PER_THREAD {
        // Use unique timestamps within the same month (same partition for monthly partitioning)
        // Add seconds to ensure uniqueness while staying in same month
        let offset_seconds = (thread_id * RECORDS_PER_THREAD + i) as i64;
        let record_timestamp = base_timestamp_clone + offset_seconds;
        let record_date = chrono::DateTime::<Utc>::from_timestamp(record_timestamp, 0)
          .unwrap()
          .format("%Y.%m.%d %H:%M:%S")
          .to_string();

        let record = json!({
          "date": record_date,
          "value": thread_id * 1000 + i,
          "thread_id": thread_id
        });
        records.push(record);
      }

      let json_data = serde_json::to_string(&records).unwrap();

      // Track attempt
      {
        let mut count = insert_attempts_clone.lock().unwrap();
        *count += RECORDS_PER_THREAD;
      }

      // Perform insert with timing
      let insert_start = std::time::Instant::now();
      match db_manager.insert(&db_name, &table_name, &json_data) {
        Ok(_) => {
          let insert_duration = insert_start.elapsed();
          let mut count = insert_successes_clone.lock().unwrap();
          *count += RECORDS_PER_THREAD;
          if thread_id == 0 && *count == RECORDS_PER_THREAD {
            // Log first successful insert timing
            eprintln!("Thread 0: First insert completed in {:.3}s", insert_duration.as_secs_f64());
          }
        }
        Err(e) => {
          let error_msg = format!("Thread {}: {}", thread_id, e);
          eprintln!("{}", error_msg);
          let mut errors = insert_errors_clone.lock().unwrap();
          errors.push(error_msg);
        }
      }
    });

    handles.push(handle);
  }

  // Wait for all threads to complete
  println!("Waiting for all threads to complete...");
  for (idx, handle) in handles.into_iter().enumerate() {
    match handle.join() {
      Ok(_) => {
        // Thread completed
      }
      Err(e) => {
        eprintln!("⚠️  Thread {} panicked: {:?}", idx, e);
      }
    }
  }

  let elapsed = start_time.elapsed();
  println!("All threads completed in {:.2} seconds", elapsed.as_secs_f64());

  let attempts = *insert_attempts.lock().unwrap();
  let successes = *insert_successes.lock().unwrap();
  let errors = insert_errors.lock().unwrap().clone();

  // Small delay to ensure all file writes are flushed
  std::thread::sleep(std::time::Duration::from_millis(300));

  println!("\n=== INSERTION RESULTS ===");
  println!("Total insertion attempts: {}", attempts);
  println!("Reported successful insertions: {}", successes);
  println!("Reported failed insertions: {}", errors.len());
  if !errors.is_empty() {
    println!("\nError details:");
    for error in errors.iter().take(5) {
      println!("  - {}", error);
    }
    if errors.len() > 5 {
      println!("  ... and {} more errors", errors.len() - 5);
    }
  }

  // Check filesystem state before querying
  println!("\n=== FILESYSTEM STATE CHECK ===");
  let table_path = format!("{}/data/{}/{}", STORAGE_PATH, DATABASE_NAME, TABLE_NAME);
  if std::path::Path::new(&table_path).exists() {
    println!("✅ Table directory exists: {}", table_path);
    let partition_count = std::fs::read_dir(&table_path)
      .map(|entries| entries.filter_map(|e| e.ok()).filter(|e| e.path().is_dir()).count())
      .unwrap_or(0);
    println!("   Found {} partition directories", partition_count);

    // Check for parquet files
    let mut parquet_files = 0;
    let mut total_size = 0u64;
    if let Ok(entries) = std::fs::read_dir(&table_path) {
      for entry in entries.flatten() {
        if entry.path().is_dir() {
          let parquet_file = entry.path().join("data.parquet");
          if parquet_file.exists() {
            parquet_files += 1;
            if let Ok(metadata) = std::fs::metadata(&parquet_file) {
              total_size += metadata.len();
            }
          }
        }
      }
    }
    println!("   Found {} parquet files, total size: {} bytes", parquet_files, total_size);
  } else {
    println!("⚠️  Table directory does NOT exist: {}", table_path);
  }

  // Now query to see how many records actually exist
  println!("\n=== QUERYING ACTUAL RECORDS ===");
  let query_result = query(DATABASE_NAME, "SELECT COUNT(*) as total FROM test_table", None, None).await?;

  let actual_count = query_result["json_value"]
    .as_array()
    .and_then(|arr| arr.get(0))
    .and_then(|obj| obj.get("total"))
    .and_then(|v| v.as_u64())
    .unwrap_or(0) as usize;

  println!("Expected records: {}", TOTAL_EXPECTED_RECORDS);
  println!("Actual records in database: {}", actual_count);

  // Check for data integrity - query all records
  let all_records_query = query(DATABASE_NAME, "SELECT * FROM test_table ORDER BY value", None, None).await?;
  let empty_array = vec![];
  let all_records = all_records_query["json_value"].as_array().unwrap_or(&empty_array);

  println!("Records retrieved from query: {}", all_records.len());

  // Check for duplicate values (should not happen with unique constraint, but let's verify)
  let mut seen_values = std::collections::HashSet::new();
  let mut duplicates = 0;
  let mut missing_threads = std::collections::HashSet::new();

  for record in all_records {
    if let Some(value) = record.get("value").and_then(|v| v.as_u64()) {
      if !seen_values.insert(value) {
        duplicates += 1;
        println!("  ⚠️  Duplicate value found: {}", value);
      }
    }
    if let Some(thread_id) = record.get("thread_id").and_then(|v| v.as_u64()) {
      missing_threads.insert(thread_id);
    }
  }

  println!("\n=== DATA INTEGRITY CHECK ===");
  println!("Duplicate values found: {}", duplicates);
  println!("Threads with data present: {}/{}", missing_threads.len(), NUM_THREADS);

  // Determine if corruption occurred
  let data_loss = TOTAL_EXPECTED_RECORDS.saturating_sub(actual_count);
  let loss_percentage = if TOTAL_EXPECTED_RECORDS > 0 {
    (data_loss as f64 / TOTAL_EXPECTED_RECORDS as f64) * 100.0
  } else {
    0.0
  };

  println!("\n=== CORRUPTION ANALYSIS ===");
  let had_corruption: bool = if actual_count < TOTAL_EXPECTED_RECORDS {
    println!("🔴 DATA LOSS DETECTED!");
    println!("   Lost {} records ({:.2}% loss)", data_loss, loss_percentage);
    println!("   This confirms concurrent insert corruption!");
    true
  } else if duplicates > 0 {
    println!("🟡 DATA INTEGRITY ISSUE!");
    println!("   Found {} duplicate values", duplicates);
    true
  } else if actual_count == TOTAL_EXPECTED_RECORDS {
    println!("✅ All records present");
    false
  } else {
    false
  };

  // Try to read the parquet file directly to check for corruption
  println!("\n=== PARQUET FILE INTEGRITY CHECK ===");
  let table_path = format!("{}/data/{}/{}", STORAGE_PATH, DATABASE_NAME, TABLE_NAME);
  println!("Table path: {}", table_path);

  // Show what partition the records should have gone to
  use timon_engine::helpers::rounded_timestamp;
  let expected_partition = rounded_timestamp(base_timestamp, BUCKET_INTERVAL);
  println!("Expected partition for all records: partition_date={}", expected_partition);

  let partition_dir = std::fs::read_dir(&table_path).ok();

  if let Some(entries) = partition_dir {
    let mut partition_count = 0;
    for entry in entries.flatten() {
      if entry.path().is_dir() {
        partition_count += 1;
        let entry_path = entry.path();
        let partition_name = entry_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown");
        println!("\nPartition found: {}", partition_name);

        let parquet_file = entry_path.join("data.parquet");
        if parquet_file.exists() {
          println!("  Parquet file: {}", parquet_file.display());
          // Try to read the file
          match std::fs::File::open(&parquet_file) {
            Ok(_) => {
              // File exists and is readable
              let file_size = std::fs::metadata(&parquet_file).map(|m| m.len()).unwrap_or(0);
              println!("  File size: {} bytes", file_size);
              if file_size == 0 {
                println!("  ⚠️  WARNING: File is empty (possible corruption)");
              } else {
                // Try to read it as parquet to check for corruption
                use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
                match SerializedFileReader::new(std::fs::File::open(&parquet_file)?) {
                  Ok(reader) => {
                    let metadata = reader.metadata();
                    let num_rows = metadata.file_metadata().num_rows();
                    println!("  ✅ File is readable, {} rows in metadata", num_rows);
                  }
                  Err(e) => {
                    println!("  🔴 CORRUPTION DETECTED: Cannot read parquet file: {}", e);
                  }
                }
              }
            }
            Err(e) => {
              println!("  🔴 ERROR: Cannot open file: {}", e);
            }
          }
        } else {
          println!("  ⚠️  No parquet file found in partition");
        }
      }
    }
    if partition_count == 0 {
      println!("  ⚠️  No partitions found in table directory");
    }
  } else {
    println!("  ⚠️  Cannot read table directory: {}", table_path);
  }

  Ok(had_corruption)
}

#[allow(dead_code)]
async fn test_local_storage() {
  const STORAGE_PATH: &str = "tmp";
  const USERNAME: &str = "ahmed_test";
  let timon_result = init_timon(STORAGE_PATH, 5, USERNAME).unwrap();
  println!("init_timon -> {}", timon_result);

  const DATABASE_NAME: &str = "zivaring";
  const TABLE_NAME: &str = "activitydetails";
  let database_result = create_database(DATABASE_NAME);
  println!("create_database -> {}", database_result.unwrap());

  let table_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "distance": {
        "type": "float",
        "max": 2500
      },
      "step": {
        "type": "int",
        "min": 10,
        "max": 100
      },
      "calories": {
        "type": "float",
        "min": 50,
        "max": 1200
      },
      "arraySteps": {
        "type": "array"
      },
      "is_sync": {
        "type": "bool"
      }
    }
  "#;
  let table_result = create_table(DATABASE_NAME, TABLE_NAME, &table_schema);
  println!("create_table -> {}", table_result.unwrap());

  let databases_list: serde_json::Value = list_databases().unwrap();
  let tables_list = list_tables(DATABASE_NAME).unwrap();
  println!("databases_list -> {:?}", databases_list);
  println!("tables_list -> {:?}", tables_list);

  struct DataPoint {
    date: String,
    array_steps: Vec<i32>,
    calories: i32,
    distance: f64,
    step: i32,
  }

  fn generate_data(n: usize) -> String {
    let start_time = Local::now().naive_local() - Duration::hours(12); // Set start time to now - 12hours
    let mut data = Vec::new();
    let mut time_counter = 0;
    for i in 0..n {
      time_counter += 1000;
      let date = start_time + Duration::milliseconds(time_counter);
      let array_steps: Vec<i32> = (0..10).map(|x| (i as i32 + x) % 50).collect();
      let calories = (i % 50) + 1;
      let distance = (i as f64 * 0.01) % 5.0;
      let step = array_steps.iter().sum::<i32>();
      data.push(json!({
          "date": date.format("%Y.%m.%d %H:%M:%S").to_string(),
          "arraySteps": array_steps,
          "calories": calories,
          "distance": distance,
          "step": step
      }));
    }
    serde_json::to_string_pretty(&data).unwrap()
  }

  // let json_data = generate_data(1_000_000);
  let json_data: String = r#"
    [
      {"date":"2025.02.10 10:00:00","arraySteps":[18,0,0,20,0,0,0,0,0,0],"calories":111,"distance":0.01,"step":10},
      {"date":"2025.02.10 10:01:00","arraySteps":[43,39,0,0,0,0,0,0,0,0],"calories":200,"distance":0.05,"step":11},
      {"date":"2025.02.10 10:02:00","arraySteps":[20,0,0,0,0,0,0,0,0,0],"calories":160,"distance":0.01,"step":12},
      {"date":"2025.02.10 10:03:00","arraySteps":[19,0,0,0,0,0,0,0,0,0],"calories":111,"distance":0.01,"step":1013},
      {"date":"2025.02.10 10:21:00","arraySteps":[54,33,2,0,0,0,0,0,0,0],"calories":180,"distance":0.06,"step":21},
      {"date":"2025.02.10 10:25:00","arraySteps":[38,0,0,15,0,0,0,0,0,0],"calories":120,"distance":0.03,"step":25},
      {"date":"2025.02.10 10:30:00","arraySteps":[50,16,0,55,23,0,0,18,46,0],"calories":6,"distance":140,"step":30},
      {"date":"2025.02.10 10:31:00","arraySteps":[18,0,0,20,0,0,0,0,0,0],"calories":170,"distance":2530.5,"step":1031}
    ]
  "#
  .to_string();

  let start_time = Instant::now(); // Start timing
  let insertion_result = insert(DATABASE_NAME, TABLE_NAME, &json_data);
  let duration = start_time.elapsed(); // Measure elapsed time
  println!("Insertion result: {}", insertion_result.unwrap());
  println!("Time taken for insertion: {:.3} seconds", duration.as_secs_f64());

  let sql_query = format!(r#"SELECT * FROM activitydetails ORDER BY date ASC"#);
  let query_result = query(DATABASE_NAME, &sql_query, None, None).await;
  println!("query_result: {}", query_result.unwrap()["json_value"]);

  // let start_time = Instant::now(); // Start timing
  // let sql_query2 = format!(r#"SELECT * FROM activitydetails LIMIT 10"#); // WHERE date BETWEEN '1730016996' AND '1739209996'
  // let query_result2 = query(DATABASE_NAME, &sql_query2, None).await;
  // let duration = start_time.elapsed(); // Measure elapsed time
  // println!("query_result: {}", query_result2.unwrap()["json_value"]);
  // println!("Time taken for query: {:.3} seconds", duration.as_secs_f64());

  // let sql_query3 = format!(r#"SELECT * FROM activitydetails"#);
  // let query_df_result = query_df(DATABASE_NAME, &sql_query3, None).await;
  // println!("query_df_result: {:?}", query_df_result.unwrap());

  // let delete_table_result = delete_table(DATABASE_NAME, "iot").unwrap();
  // println!("delete_table_result -> {}", delete_table_result);
  // let delete_database_result = delete_database(DATABASE_NAME).unwrap();
  // println!("delete_database_result -> {}", delete_database_result);
}

#[allow(dead_code)]
async fn test_s3_sync() {
  const USERNAME: &str = "ahmed_test";
  const DATABASE_NAME: &str = "zivaring";
  const TABLE_NAME: &str = "activitydetails";
  init_timon("tmp", 43200, USERNAME).unwrap();

  let bucket_endpoint = "https://s3.us-west-2.amazonaws.com";
  let bucket_name = "zivaoneapp";
  let access_key_id = "xxx";
  let secret_access_key = "xxx";
  let bucket_region = "us-west-2";
  let init_bucket_result = init_bucket(bucket_endpoint, bucket_name, access_key_id, secret_access_key, bucket_region).unwrap();
  println!("init_bucket_result: {}", init_bucket_result);

  let fetch_range = std::collections::HashMap::from([("start_date", "2025-01-01"), ("end_date", "2025-12-30")]);
  let cloud_fetch_parquet_result = cloud_fetch_parquet(USERNAME, DATABASE_NAME, TABLE_NAME, fetch_range.clone()).await;
  println!("{}", cloud_fetch_parquet_result.unwrap());

  let cloud_sink_parquet_result = cloud_sink_parquet(DATABASE_NAME, TABLE_NAME).await;
  println!("{}", cloud_sink_parquet_result.unwrap());

  let cloud_sync_parquet_result = cloud_sync_parquet(DATABASE_NAME, TABLE_NAME, fetch_range.clone(), None);
  println!("{}", cloud_sync_parquet_result.await.unwrap());
}

// This block is executed for local development testing(run async tests for local_storage and S3 cloud_sync).
#[cfg(all(not(feature = "dev_cli"), not(feature = "cloud_server")))]
fn main() {
  tokio::runtime::Runtime::new().expect("Failed to create runtime").block_on(async {
    // test_local_storage().await;
    // test_s3_sync().await;
    // let _ = test_ziva_ring_insert().await;
    // let _ = test_ziva_ring_query().await;
    // let _ = test_ziva_join_query().await;
    // let _ = insert_ziva_data_six_months().await;
    // let _ = test_ziva_range_selction_query().await;
    // let _ = test_partition_limit().await;
    // let _ = test_sleep_queries().await;
    // let _ = test_hrv_queries().await;
    // let _ = test_rhr_queries().await;
    // let _ = test_vitality_queries().await;
    // let _ = ziva_app_queries().await;
    // let _ = ziva_username_query_matching().await;
    // let _ = check_ziva_fecth_time().await;
    // let _ = test_concurrent_inserts().await;
  });
}

/*
****** bucket_interval ******
Hourly = 60
Daily = 1440
Weekly = 10080
Monthly = 43200
*/

#[allow(dead_code)]
async fn test_ziva_ring_insert() -> Result<(), Box<dyn std::error::Error>> {
  const STORAGE_PATH: &str = "tmp";
  const USERNAME: &str = "ahmed_test";
  let timon_result = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
  println!("init_timon -> {}", timon_result);

  const DATABASE_NAME: &str = "zivaring";
  let database_result = create_database(DATABASE_NAME);
  println!("create_database -> {}", database_result.unwrap());

  // Activity Details Table Schema
  let activity_details_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "step": {
        "type": "int"
      },
      "arraySteps": {
        "type": "array"
      },
      "calories": {
        "type": "float"
      },
      "distance": {
        "type": "float"
      }
    }
  "#;

  // SPO2 Table Schema
  let spo2_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "automaticSpo2Data": {
        "type": "int"
      }
    }
    "#;

  // Heart Rate Table Schema
  let heartrate_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "singleHR": {
        "type": "int"
      }
    }
    "#;

  // HRV Table Schema
  let hrv_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "hrv": {
        "type": "int"
      },
      "heartRate": {
        "type": "int"
      },
      "stress": {
        "type": "int"
      },
      "diastolicBP": {
        "type": "int"
      },
      "systolicBP": {
        "type": "int"
      },
      "vascularAging": {
        "type": "int"
      },
      "is_sync": {
        "type": "bool"
      }
    }
    "#;

  // Sleep Table Schema
  let sleep_schema = r#"
    {
      "date": {
        "type":"int",
        "required":true,
        "unique":true,
        "datetime":true
      },
      "unitLength":{
        "type":"int"
      },
      "quality":{
        "type":"int"
      },
      "start":{
        "type":"string"
      }
    }
    "#;

  // Temperature Table Schema
  let temperature_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "temperature": {
        "type": "float"
      }
    }
    "#;

  // Create tables
  let activity_details_result = create_table(DATABASE_NAME, "activitydetails", &activity_details_schema);
  println!("Create activitydetails table -> {}", activity_details_result.unwrap());

  let sleep_result = create_table(DATABASE_NAME, "sleep", &sleep_schema);
  println!("Create sleep table -> {}", sleep_result.unwrap());

  let spo2_result = create_table(DATABASE_NAME, "spo2", &spo2_schema);
  println!("Create SPO2 table -> {}", spo2_result.unwrap());

  let hr_result = create_table(DATABASE_NAME, "heartrate", &heartrate_schema);
  println!("Create heart rate table -> {}", hr_result.unwrap());

  let hrv_result = create_table(DATABASE_NAME, "hrv_table", &hrv_schema);
  println!("Create HRV table -> {}", hrv_result.unwrap());

  let temp_result = create_table(DATABASE_NAME, "temperature_readings", &temperature_schema);
  println!("Create temperature table -> {}", temp_result.unwrap());

  // Read JSON file
  let file_content =
    std::fs::read_to_string("/home/ahmed/Documents/ziva_data_android.json").map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
  let json_data: serde_json::Value = serde_json::from_str(&file_content).map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
  let start_time = Instant::now();

  // Insert activity details
  if let Some(activity_details) = json_data["activitydetails"].as_array() {
    let formatted_activity_details: Vec<serde_json::Value> = activity_details
      .iter()
      .map(|reading| {
        let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
        let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
        let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
        json!({
          "date": date,
          "step": reading["step"],
          "arraySteps": reading["arraySteps"],
          "calories": reading["calories"],
          "distance": reading["distance"]
        })
      })
      .collect();
    let activity_details_json = serde_json::to_string(&formatted_activity_details)?;
    let insertion_result = insert(DATABASE_NAME, "activitydetails", &activity_details_json)?;
    println!("Activity details insertion result: {}", insertion_result);
  }

  // Insert SPO2 readings
  if let Some(spo2) = json_data["spo2"].as_array() {
    let formatted_spo2: Vec<serde_json::Value> = spo2
      .iter()
      .map(|reading| {
        let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
        let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
        let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
        json!({
          "date": date,
          "automaticSpo2Data": reading["automaticSpo2Data"]
        })
      })
      .collect();
    let spo2_json = serde_json::to_string(&formatted_spo2)?;
    let insertion_result = insert(DATABASE_NAME, "spo2", &spo2_json)?;
    println!("SPO2 insertion result: {}", insertion_result);
  }

  // Insert heart rate readings
  if let Some(heartrate) = json_data["heartrate"].as_array() {
    let formatted_hr: Vec<serde_json::Value> = heartrate
      .iter()
      .map(|reading| {
        let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
        let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
        let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
        json!({
          "date": date,
          "singleHR": reading["singleHR"]
        })
      })
      .collect();
    let heartrate_json = serde_json::to_string(&formatted_hr)?;
    let insertion_result = insert(DATABASE_NAME, "heartrate", &heartrate_json)?;
    println!("Heart rate insertion result: {}", insertion_result);
  }

  // Insert HRV readings
  if let Some(hrv) = json_data["hrv_table"].as_array() {
    let formatted_hrv: Vec<serde_json::Value> = hrv
      .iter()
      .map(|reading| {
        let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
        let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
        let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
        json!({
          "date": date,
          "heartRate": reading["heartRate"],
          "hrv": reading["hrv"],
          "stress": reading["stress"],
          "vascularAging": reading["vascularAging"],
          "diastolicBP": reading["diastolicBP"],
          "systolicBP": reading["systolicBP"],
        })
      })
      .collect();
    let hrv_json = serde_json::to_string(&formatted_hrv)?;
    let insertion_result = insert(DATABASE_NAME, "hrv_table", &hrv_json)?;
    println!("HRV insertion result: {}", insertion_result);
  }

  // Insert sleep readings
  if let Some(sleep) = json_data["sleep"].as_array() {
    let formatted_sleep: Vec<serde_json::Value> = sleep
      .iter()
      .map(|reading| {
        let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
        let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
        let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
        json!({
          "date": date,
          "unitLength": reading["unitLength"],
          "quality": reading["quality"],
          "start": reading["start"]
        })
      })
      .collect();
    let sleep_json = serde_json::to_string(&formatted_sleep)?;
    let insertion_result = insert(DATABASE_NAME, "sleep", &sleep_json)?;
    println!("Sleep insertion result: {}", insertion_result);
  }

  // Insert temperature readings
  if let Some(temperature) = json_data["temperature_table"].as_array() {
    let formatted_temp: Vec<serde_json::Value> = temperature
      .iter()
      .map(|reading| {
        let date_str = reading["date"].as_str().unwrap_or("2025.01.01 00:00:00");
        let naive_datetime = chrono::NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S").unwrap_or_default();
        let date = DateTime::<Utc>::from_naive_utc_and_offset(naive_datetime, Utc);
        json!({
          "date": date,
          "temperature": reading["temperature"]
        })
      })
      .collect();
    let temperature_json = serde_json::to_string(&formatted_temp)?;
    let insertion_result = insert(DATABASE_NAME, "temperature_readings", &temperature_json)?;
    println!("Temperature insertion result: {}", insertion_result);
  }

  let duration = start_time.elapsed();
  println!("Total time taken for all insertions: {:.3} seconds", duration.as_secs_f64());

  Ok(())
}

#[allow(dead_code)]
async fn test_ziva_ring_query() -> Result<(), Box<dyn std::error::Error>> {
  const STORAGE_PATH: &str = "tmp";
  const DATABASE_NAME: &str = "zivaring";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing queries for user: {} ===", username);

    // Query activity details
    let start_time = Instant::now();
    let activity_details_query = format!(r#"SELECT * FROM activitydetails"#);
    let activity_details_result = query(DATABASE_NAME, &activity_details_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Activity details {} (Time taken: {:.3} seconds)",
      activity_details_result["status"],
      duration.as_secs_f64()
    );

    // Query SPO2 readings
    let start_time = Instant::now();
    let spo2_query = format!(r#"SELECT * FROM spo2"#);
    let spo2_result = query(DATABASE_NAME, &spo2_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "SPO2 readings {} (Time taken: {:.3} seconds)",
      spo2_result["status"],
      duration.as_secs_f64()
    );

    // Query heart rate readings
    let start_time = Instant::now();
    let hr_query = format!(r#"SELECT * FROM heartrate"#);
    let hr_result = query(DATABASE_NAME, &hr_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Heart rate readings {} (Time taken: {:.3} seconds)",
      hr_result["status"],
      duration.as_secs_f64()
    );

    // Query HRV readings
    let start_time = Instant::now();
    let hrv_query = format!(r#"SELECT * FROM hrv_table"#);
    let hrv_result = query(DATABASE_NAME, &hrv_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "HRV readings {} (Time taken: {:.3} seconds)",
      hrv_result["status"],
      duration.as_secs_f64()
    );

    // Query temperature readings
    let start_time = Instant::now();
    let temp_query = format!(r#"SELECT * FROM temperature_readings"#);
    let temp_result = query(DATABASE_NAME, &temp_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Temperature readings {} (Time taken: {:.3} seconds)",
      temp_result["status"],
      duration.as_secs_f64()
    );

    // Test some specific queries
    println!("\nTesting specific queries:");

    // Query for average heart rate
    let start_time = Instant::now();
    let avg_hr_query = format!(r#"SELECT * FROM heartrate"#);
    let avg_hr_result = query(DATABASE_NAME, &avg_hr_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Average heart rate {} (Time taken: {:.3} seconds)",
      avg_hr_result["status"],
      duration.as_secs_f64()
    );

    // Query for max SPO2
    let start_time = Instant::now();
    let max_spo2_query = format!(r#"SELECT * FROM spo2"#);
    let max_spo2_result = query(DATABASE_NAME, &max_spo2_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Max SPO2: {} (Time taken: {:.3} seconds)",
      max_spo2_result["status"],
      duration.as_secs_f64()
    );

    // Query for stress levels over time
    let start_time = Instant::now();
    let stress_query = format!(r#"SELECT * FROM hrv_table"#);
    let stress_result = query(DATABASE_NAME, &stress_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Stress levels over time: {} (Time taken: {:.3} seconds)",
      stress_result["status"],
      duration.as_secs_f64()
    );
  }

  Ok(())
}

#[allow(dead_code)]
async fn test_ziva_join_query() -> Result<(), Box<dyn std::error::Error>> {
  const STORAGE_PATH: &str = "tmp";
  const DATABASE_NAME: &str = "zivaring";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing JOIN query for user: {} ===", username);
    let start_time = Instant::now();
    let sql_query = "SELECT * FROM activitydetails JOIN spo2 ON to_char(to_timestamp(activitydetails.date), 'YYYY-MM-DD') = to_char(to_timestamp(spo2.date), 'YYYY-MM-DD') LIMIT 100";
    let result = query(DATABASE_NAME, sql_query, None, None).await?;
    let duration = start_time.elapsed();
    println!("Query time: {:.3} seconds", duration.as_secs_f64());
    println!("JOIN Query Result: {} status: {}", result["json_value"], result["status"]);
  }

  Ok(())
}

fn generate_spo2_data(start: &str, end: &str) -> Result<String, Box<dyn std::error::Error>> {
  use chrono::{Duration, NaiveDateTime};
  use serde_json::json;
  use std::time::{SystemTime, UNIX_EPOCH};
  // Simple random number generator using system time
  fn get_random_number(max: u32) -> u32 {
    let seed = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u32;
    seed % max
  }
  let start_time = NaiveDateTime::parse_from_str(start, "%Y-%m-%d %H:%M:%S")?;
  let end_time = NaiveDateTime::parse_from_str(end, "%Y-%m-%d %H:%M:%S")?;
  let mut data = Vec::new();
  let mut current = start_time;
  while current < end_time {
    data.push(json!({
        "date": current.format("%Y-%m-%d %H:%M:%S").to_string(),
        "automaticSpo2Data": get_random_number(100)
    }));
    current = current + Duration::minutes(5);
  }
  // Convert the data to JSON string
  Ok(serde_json::to_string(&data)?)
}

#[allow(dead_code)]
async fn insert_ziva_data_six_months() -> Result<(), Box<dyn std::error::Error>> {
  const STORAGE_PATH: &str = "tmp";
  const USERNAME: &str = "ahmed_test";
  const DATABASE_NAME: &str = "zivaring";
  let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
  let _ = create_database(DATABASE_NAME);

  // SPO2 Table Schema
  let spo2_schema = r#"
    {
      "date": {
        "type": "int",
        "required": true,
        "unique": true,
        "datetime": true
      },
      "automaticSpo2Data": {
        "type": "int"
      }
    }
    "#;
  let spo2_result = create_table(DATABASE_NAME, "spo2", &spo2_schema);
  println!("Create SPO2 table -> {}", spo2_result.unwrap());

  // // SPO2 table Count: 53k rows
  let start: &'static str = "2024-11-01 00:00:00";
  let end = "2025-05-01 00:00:00";
  let _json_data = generate_spo2_data(start, end)?;
  // let insertion_result = insert(DATABASE_NAME, "spo2", &_json_data)?;
  // println!("SPO2 data insertion result: {}", insertion_result);

  const QUERY: &str = "SELECT * FROM spo2 ORDER BY date ASC";
  let start_time = std::time::Instant::now();
  let result = query(DATABASE_NAME, QUERY, None, None).await?;
  let duration = start_time.elapsed();
  println!("Query execution time: {:.3} seconds", duration.as_secs_f64());
  println!("Result: {:?}", result.get("status").unwrap());

  Ok(())
}

#[allow(dead_code)]
async fn test_ziva_range_selction_query() -> Result<(), Box<dyn std::error::Error>> {
  const STORAGE_PATH: &str = "tmp";
  const DATABASE_NAME: &str = "zivaring";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing range selection query for user: {} ===", username);
    let start_time = Instant::now();
    const QUERY_2: &str = "SELECT COUNT(*) AS total FROM activitydetails WHERE partition_date BETWEEN '2025-08-27' AND '2025-09-20'";
    let result = query(DATABASE_NAME, QUERY_2, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Range Selection Result: {} status: {} (Time taken: {:.3} seconds)",
      result["json_value"],
      result["status"],
      duration.as_secs_f64()
    );
  }

  Ok(())
}

#[allow(dead_code)]
async fn test_partition_limit() -> Result<(), Box<dyn std::error::Error>> {
  const STORAGE_PATH: &str = "tmp";
  const USERNAME: &str = "ahmed_test";
  const DATABASE_NAME: &str = "zivaring";
  let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();

  println!("\n=== Testing Partition Limit Functionality ===");

  // Test querying last 3 partitions
  let sql_query = "SELECT COUNT(*) AS total FROM hrv_table";
  let result = query(DATABASE_NAME, sql_query, None, Some(1)).await?;
  println!("Last 2 partitions result: {} status: {}", result["json_value"], result["status"]);

  // Test querying last 7 partitions
  let result2 = query(DATABASE_NAME, sql_query, None, Some(2)).await?;
  println!("Last 3 partitions result: {} status: {}", result2["json_value"], result2["status"]);

  // Test without partition limit (all partitions)
  let result3 = query(DATABASE_NAME, sql_query, None, None).await?;
  println!("All partitions result: {} status: {}", result3["json_value"], result3["status"]);

  Ok(())
}

#[allow(dead_code)]
async fn test_sleep_queries() -> Result<(), Box<dyn std::error::Error>> {
  println!("Testing Sleep Queries for Daily Vitality Score");
  const STORAGE_PATH: &str = "tmp";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing Sleep Queries for user: {} ===", username);

    // Test 1: Get last night's sleep data (minute-by-minute objects)
    println!("\n=== LAST NIGHT'S SLEEP DATA ===");

    let sleep_dates = vec![("2025-09-22", 1758499200, 1758585599)];
    for (date_label, start_ts, end_ts) in &sleep_dates {
      println!("\n--- Sleep data for {} ---", date_label);
      let start_time = Instant::now();

      let last_night_query = format!(
        r#"
        SELECT
          COUNT(DISTINCT start) as sleep_sessions_count,
          COUNT(*) / 60.0 AS sleep_total_hours,
          (
            SELECT MAX(session_minutes) / 60.0
            FROM (
              SELECT start, COUNT(*) as session_minutes
              FROM sleep
              WHERE date BETWEEN {} AND {}
              GROUP BY start
            ) as session_counts
          ) AS sleep_longest_session,
          COUNT(CASE WHEN quality = 1 THEN 1 END) / 60.0 AS sleep_light_hours,
          COUNT(CASE WHEN quality = 2 THEN 1 END) / 60.0 AS sleep_deep_hours,
          COUNT(CASE WHEN quality = 3 THEN 1 END) / 60.0 AS sleep_rem_hours
        FROM sleep
        WHERE date BETWEEN {} AND {}
        "#,
        start_ts, end_ts, start_ts, end_ts
      );

      let last_night_result = query("zivaring", &last_night_query, None, None).await?;
      let duration = start_time.elapsed();
      println!("(Query time: {:.3} seconds)", duration.as_secs_f64());
      // println!("Last night's sleep data: {}", last_night_result["json_value"]);
      let last_night_value = last_night_result["json_value"][0].clone();
      println!(
        "{:.1} hours total sleep, but broken into {} separate sessions - longest only {:.1} hours",
        last_night_value["sleep_total_hours"].as_f64().unwrap_or(0.0),
        last_night_value["sleep_sessions_count"],
        last_night_value["sleep_longest_session"].as_f64().unwrap_or(0.0)
      );
      println!(
        "Deep: {:.1}h * Light: {:.1}h * REM: {:.1}h",
        last_night_value["sleep_deep_hours"].as_f64().unwrap_or(0.0),
        last_night_value["sleep_light_hours"].as_f64().unwrap_or(0.0),
        last_night_value["sleep_rem_hours"].as_f64().unwrap_or(0.0)
      );
    }

    // Test 2: Get sleep consistency data (previous 6 nights)
    println!("\n=== SLEEP CONSISTENCY DATA (Previous 6 nights) ===");

    // For sleep consistency, we need to analyze sleep sessions from each of the previous 6 days
    // Let's query each day separately to get sleep sessions per day
    let consistency_dates = vec![
      ("2025-09-22", 1758499200, 1758585599), // Sep 22: 00:00 to 23:59
      ("2025-09-21", 1758412800, 1758499199), // Sep 21: 00:00 to 23:59
      ("2025-09-20", 1758326400, 1758412799), // Sep 20: 00:00 to 23:59
      ("2025-09-19", 1758240000, 1758326399), // Sep 19: 00:00 to 23:59
      ("2025-09-18", 1758153600, 1758239999), // Sep 18: 00:00 to 23:59
      ("2025-09-17", 1758067200, 1758153599), // Sep 17: 00:00 to 23:59
    ];

    // Collect all consistency data first
    let mut consistency_data = Vec::new();
    for (date_label, start_ts, end_ts) in &consistency_dates {
      let day_start_time = Instant::now();
      let day_consistency_query = format!(
        r#"
        SELECT
          '{}' as date,
          COUNT(DISTINCT start) as sessions_count,
          COALESCE(SUM(session_duration), 0) as total_sleep_minutes
        FROM (
          SELECT start, COUNT(*) as session_duration
          FROM sleep
          WHERE date BETWEEN {} AND {}
          GROUP BY start
        ) as daily_sessions
      "#,
        date_label, start_ts, end_ts
      );

      let day_result = query("zivaring", &day_consistency_query, None, None).await?;
      let day_duration = day_start_time.elapsed();
      println!(
        "day_result {} status: {} (Time: {:.3}s) \n",
        day_result["json_value"],
        day_result["status"],
        day_duration.as_secs_f64()
      );
      if let Some(day_data) = day_result["json_value"].as_array().and_then(|arr| arr.get(0)) {
        if let (Some(sessions), Some(minutes)) = (day_data["sessions_count"].as_i64(), day_data["total_sleep_minutes"].as_i64()) {
          consistency_data.push((sessions, minutes));
        }
      }
    }

    // Calculate sleep consistency score based on variance
    if consistency_data.len() >= 3 {
      let durations: Vec<f64> = consistency_data.iter().map(|(_, minutes)| *minutes as f64).collect();

      let mean = durations.iter().sum::<f64>() / durations.len() as f64;
      let variance = durations.iter().map(|duration| (duration - mean).powi(2)).sum::<f64>() / durations.len() as f64;
      let stddev = variance.sqrt();

      // Calculate consistency score based on standard deviation thresholds
      let consistency_score = if stddev <= 30.0 {
        100 // Excellent consistency (±30 min)
      } else if stddev <= 60.0 {
        75 // Good consistency (±60 min)
      } else if stddev <= 90.0 {
        50 // Fair consistency (±90 min)
      } else {
        25 // Poor consistency (>90 min)
      };

      // Determine consistency message
      let consistency_message = if consistency_score >= 90 {
        "Excellent sleep consistency this week"
      } else if consistency_score >= 70 {
        "Good sleep routine maintained"
      } else if consistency_score >= 50 {
        "Sleep schedule somewhat variable"
      } else {
        "Irregular sleep pattern - try consistent bedtime"
      };

      println!("\n=== SLEEP CONSISTENCY SCORE ===");
      println!("Duration Standard Deviation: {:.1} minutes", stddev);
      println!("Consistency Score: {}/100 ({})", consistency_score, consistency_message);

      // Display individual night data
      println!("\n=== INDIVIDUAL NIGHT DATA ===");
      for (i, (date_label, _, _)) in consistency_dates.iter().enumerate() {
        if i < consistency_data.len() {
          let (sessions, minutes) = consistency_data[i];
          println!(
            "{}: {} sessions, {} minutes ({:.1}h)",
            date_label,
            sessions,
            minutes,
            minutes as f64 / 60.0
          );
        }
      }
    } else {
      println!("\n=== SLEEP CONSISTENCY SCORE ===");
      println!("Insufficient data for consistency scoring (need at least 3 nights)");
      println!("Available data points: {}", consistency_data.len());
    }
  }

  Ok(())
}

#[allow(dead_code)]
async fn test_hrv_queries() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== HRV QUERIES FOR RECOVERY COMPONENT ===");
  const STORAGE_PATH: &str = "tmp";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing HRV queries for user: {} ===", username);
    let start_time = Instant::now();

    let rhr_hrv_query = r#"
  WITH date_params AS (
  SELECT 
      '2025-10-01'::DATE as target_date_local,
      'America/Los_Angeles' as user_timezone,
      
      ('2025-10-01'::DATE + INTERVAL '7 hours') as day_start_utc,
      ('2025-10-01'::DATE + INTERVAL '1 day' + INTERVAL '7 hours') as day_end_utc,
      
      ('2025-10-01'::DATE - INTERVAL '30 days' + INTERVAL '7 hours') as baseline_start_utc,
      ('2025-10-01'::DATE + INTERVAL '7 hours') as baseline_end_utc
  ),

  today_hrv_data AS (
      SELECT 
          TO_TIMESTAMP(h.date) as hrv_timestamp,
          h.hrv,
          h.stress,
          h."heartRate"
      FROM hrv_table h
      CROSS JOIN date_params dp
      WHERE TO_TIMESTAMP(h.date) 
          BETWEEN dp.day_start_utc AND dp.day_end_utc
      AND h.hrv > 0
      AND h.hrv < 200
  ),

  today_hrv_summary AS (
      SELECT 
          COUNT(*) as reading_count,
          AVG(hrv) as avg_hrv,
          STDDEV(hrv) as hrv_stddev,
          MIN(hrv) as min_hrv,
          MAX(hrv) as max_hrv,
          AVG(stress) as avg_stress,
          AVG("heartRate") as avg_heart_rate
      FROM today_hrv_data
  ),

  baseline_hrv_data AS (
      SELECT 
          DATE_TRUNC('day', TO_TIMESTAMP(h.date)) as day_utc,
          h.hrv
      FROM hrv_table h
      CROSS JOIN date_params dp
      WHERE TO_TIMESTAMP(h.date) 
          BETWEEN dp.baseline_start_utc AND dp.day_start_utc
      AND h.hrv > 0
      AND h.hrv < 200
  ),

  daily_baseline_hrv AS (
      SELECT 
          day_utc,
          AVG(hrv) as daily_avg_hrv,
          COUNT(*) as daily_reading_count
      FROM baseline_hrv_data
      GROUP BY day_utc
      HAVING COUNT(*) >= 3
  ),

  baseline_stats AS (
      SELECT 
          AVG(daily_avg_hrv) as baseline_hrv,
          STDDEV(daily_avg_hrv) as baseline_stddev,
          MIN(daily_avg_hrv) as baseline_min,
          MAX(daily_avg_hrv) as baseline_max,
          COUNT(*) as baseline_days_count
      FROM daily_baseline_hrv
  ),

  hrv_recovery_calculation AS (
      SELECT 
          th.avg_hrv as today_hrv,
          th.reading_count as today_readings,
          th.hrv_stddev as today_stddev,
          th.min_hrv as today_min,
          th.max_hrv as today_max,
          th.avg_stress as today_stress,
          th.avg_heart_rate as today_heart_rate,
          
          bs.baseline_hrv,
          bs.baseline_stddev,
          bs.baseline_min,
          bs.baseline_max,
          bs.baseline_days_count,
          
          CASE 
              WHEN bs.baseline_stddev > 0 AND th.avg_hrv IS NOT NULL THEN
                  (th.avg_hrv - bs.baseline_hrv) / bs.baseline_stddev
              ELSE NULL
          END as z_score,
          
          CASE 
              WHEN th.avg_hrv IS NOT NULL AND bs.baseline_hrv IS NOT NULL THEN
                  th.avg_hrv - bs.baseline_hrv
              ELSE NULL
          END as hrv_deviation,
          
          CASE 
              WHEN th.avg_hrv IS NOT NULL AND bs.baseline_hrv IS NOT NULL AND bs.baseline_hrv > 0 THEN
                  ((th.avg_hrv - bs.baseline_hrv) / bs.baseline_hrv) * 100
              ELSE NULL
          END as hrv_percent_change
          
      FROM today_hrv_summary th
      CROSS JOIN baseline_stats bs
  ),

  hrv_recovery_score AS (
      SELECT 
          *,
          CASE 
              WHEN z_score IS NULL THEN NULL
              WHEN baseline_days_count < 7 THEN 75
              WHEN z_score >= 1.0 THEN 100
              WHEN z_score >= 0.5 THEN 85 + ((z_score - 0.5) * 30)
              WHEN z_score >= 0 THEN 70 + (z_score * 30)
              WHEN z_score >= -0.5 THEN 50 + ((z_score + 0.5) * 40)
              WHEN z_score >= -1.0 THEN 25 + ((z_score + 1.0) * 50)
              ELSE 25
          END as recovery_score,
          
          CASE 
              WHEN z_score IS NULL THEN 'No Data'
              WHEN baseline_days_count < 7 THEN 'Building Baseline'
              WHEN z_score >= 0.5 THEN 'High Recovery'
              WHEN z_score >= -0.5 THEN 'Normal Recovery'
              ELSE 'Low Recovery'
          END as recovery_status,
          
          CASE 
              WHEN z_score IS NULL THEN 'No HRV data available for today'
              WHEN baseline_days_count < 7 THEN 
                  'Building your baseline (' || baseline_days_count || ' of 30 days)'
              WHEN z_score >= 1.0 THEN 
                  'Outstanding recovery! Your HRV is significantly above baseline (+' || 
                  ROUND(hrv_percent_change, 1) || '%)'
              WHEN z_score >= 0.5 THEN 
                  'Excellent recovery. Your body is well-rested (+' || 
                  ROUND(hrv_percent_change, 1) || '%)'
              WHEN z_score >= 0 THEN 
                  'Good recovery. You are ready for normal activities'
              WHEN z_score >= -0.5 THEN 
                  'Normal recovery. Your body is functioning well'
              WHEN z_score >= -1.0 THEN 
                  'Below average recovery. Consider lighter activities today'
              ELSE 
                  'Low recovery. Your body needs rest. Focus on recovery today'
          END as recovery_message,
          
          CASE 
              WHEN z_score IS NULL THEN 'Sync your ring for activity guidance'
              WHEN baseline_days_count < 7 THEN 'Normal activities are fine'
              WHEN z_score >= 0.5 THEN 'Perfect day for intense training or challenging work'
              WHEN z_score >= 0 THEN 'Good day for moderate exercise and productive work'
              WHEN z_score >= -0.5 THEN 'Stick to light to moderate activities'
              ELSE 'Prioritize rest, recovery, and light movement only'
          END as activity_recommendation
          
      FROM hrv_recovery_calculation
  )

  SELECT 
      dp.target_date_local as date,
      
      ROUND(hrs.today_hrv, 1) as hrv_ms,
      hrs.today_readings as hrv_reading_count,
      ROUND(hrs.today_min, 1) as hrv_min,
      ROUND(hrs.today_max, 1) as hrv_max,
      ROUND(hrs.today_stddev, 1) as hrv_stddev,
      
      ROUND(hrs.today_stress, 1) as stress_level,
      ROUND(hrs.today_heart_rate, 1) as avg_heart_rate,
      
      ROUND(hrs.baseline_hrv, 1) as baseline_hrv_30day,
      ROUND(hrs.baseline_stddev, 1) as baseline_stddev,
      ROUND(hrs.baseline_min, 1) as baseline_min,
      ROUND(hrs.baseline_max, 1) as baseline_max,
      hrs.baseline_days_count as baseline_days,
      
      ROUND(hrs.z_score, 2) as z_score,
      ROUND(hrs.hrv_deviation, 1) as hrv_change_ms,
      ROUND(hrs.hrv_percent_change, 1) as hrv_change_percent,
      
      hrs.recovery_status as status,
      ROUND(hrs.recovery_score, 0) as recovery_component_score,
      hrs.recovery_message as message,
      hrs.activity_recommendation as recommendation,
      
      CASE 
          WHEN hrs.today_readings >= 12 THEN 'High'
          WHEN hrs.today_readings >= 6 THEN 'Medium'
          WHEN hrs.today_readings >= 3 THEN 'Low'
          ELSE 'Very Low'
      END as data_quality,
      
      now() as calculated_at_utc

  FROM date_params dp
  CROSS JOIN hrv_recovery_score hrs;
  "#;

    let hrv_score_result = query("zivaring", rhr_hrv_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Resting Heart Rate Result: {} status: {} (Time taken: {:.3} seconds)",
      hrv_score_result["json_value"],
      hrv_score_result["status"],
      duration.as_secs_f64()
    );
  }

  Ok(())
}

#[allow(dead_code)]
async fn test_rhr_queries() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== RHR QUERIES FOR HEART HEALTH COMPONENT ===");
  const STORAGE_PATH: &str = "tmp";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing RHR queries for user: {} ===", username);
    let start_time = Instant::now();

    let rhr_sql_query = r#"
  WITH date_params AS (
    SELECT
      to_timestamp('2025-09-01T00:00:00') AS target_date_utc,
      'America/Los_Angeles' AS user_timezone,

      to_timestamp('2025-09-01T01:00:00') AS sleep_window_start_utc,
      to_timestamp('2025-09-01T13:00:00') AS sleep_window_end_utc,

      to_timestamp('2025-09-01T07:00:00') AS day_start_utc,
      to_timestamp('2025-10-01T07:00:00') AS day_end_utc
  ),

  -- sleep timestamps (epoch seconds as BIGINT + timestamp)
  sleep_timestamps AS (
    SELECT
      CAST(s.date AS BIGINT) AS sleep_epoch,
      to_timestamp_seconds(CAST(s.date AS BIGINT)) AS sleep_timestamp_utc
    FROM sleep s
    CROSS JOIN date_params dp
    WHERE to_timestamp_seconds(CAST(s.date AS BIGINT))
      BETWEEN dp.sleep_window_start_utc AND dp.sleep_window_end_utc
  ),

  -- heart rate rows that align to a sleep timestamp within +/-30s
  hr_during_sleep AS (
    SELECT
      hr."singleHR" AS heart_rate,
      CAST(hr.date AS BIGINT) AS hr_epoch,
      to_timestamp_seconds(CAST(hr.date AS BIGINT)) AS hr_timestamp_utc
    FROM heartrate hr
    WHERE hr."singleHR" BETWEEN 40 AND 120
      AND EXISTS (
        SELECT 1
        FROM sleep_timestamps st
        WHERE ABS(CAST(hr.date AS BIGINT) - st.sleep_epoch) <= 30
      )
  ),

  -- SAFE pattern: only compute aggregates if count > 0
  sleep_based_rhr AS (
    SELECT
      COUNT(*) AS reading_count,
      CASE WHEN COUNT(*) > 0 THEN APPROX_PERCENTILE_CONT(heart_rate, 0.2) ELSE NULL END AS rhr_value,
      CASE WHEN COUNT(*) > 0 THEN AVG(heart_rate) ELSE NULL END AS avg_hr,
      CASE WHEN COUNT(*) > 0 THEN MIN(heart_rate) ELSE NULL END AS min_hr,
      CASE WHEN COUNT(*) > 0 THEN MAX(heart_rate) ELSE NULL END AS max_hr,
      'sleep_based' AS rhr_source
    FROM hr_during_sleep
  ),

  -- all-day hr rows (filtered)
  hr_all_day AS (
    SELECT
      hr."singleHR" AS heart_rate
    FROM heartrate hr
    WHERE hr."singleHR" BETWEEN 40 AND 120
      AND to_timestamp_seconds(CAST(hr.date AS BIGINT))
        BETWEEN (SELECT day_start_utc FROM date_params)
        AND (SELECT day_end_utc FROM date_params)
  ),

  all_day_rhr AS (
    SELECT
      COUNT(*) AS reading_count,
      CASE WHEN COUNT(*) > 0 THEN APPROX_PERCENTILE_CONT(heart_rate, 0.2) ELSE NULL END AS rhr_value,
      CASE WHEN COUNT(*) > 0 THEN AVG(heart_rate) ELSE NULL END AS avg_hr,
      CASE WHEN COUNT(*) > 0 THEN MIN(heart_rate) ELSE NULL END AS min_hr,
      CASE WHEN COUNT(*) > 0 THEN MAX(heart_rate) ELSE NULL END AS max_hr,
      'all_day_fallback' AS rhr_source
    FROM hr_all_day
  ),

  today_rhr AS (
    SELECT
      COALESCE(
        CASE WHEN sb.reading_count >= 10 THEN sb.rhr_value END,
        CASE WHEN ad.reading_count >= 10 THEN ad.rhr_value END
      ) AS rhr_value,

      COALESCE(
        CASE WHEN sb.reading_count >= 10 THEN sb.reading_count END,
        CASE WHEN ad.reading_count >= 10 THEN ad.reading_count END,
        0
      ) AS reading_count,

      COALESCE(
        CASE WHEN sb.reading_count >= 10 THEN sb.rhr_source END,
        CASE WHEN ad.reading_count >= 10 THEN ad.rhr_source END,
        'no_data'
      ) AS rhr_source,

      COALESCE(
        CASE WHEN sb.reading_count >= 10 THEN sb.avg_hr END,
        CASE WHEN ad.reading_count >= 10 THEN ad.avg_hr END
      ) AS avg_hr,

      COALESCE(
        CASE WHEN sb.reading_count >= 10 THEN sb.min_hr END,
        CASE WHEN ad.reading_count >= 10 THEN ad.min_hr END
      ) AS min_hr,

      COALESCE(
        CASE WHEN sb.reading_count >= 10 THEN sb.max_hr END,
        CASE WHEN ad.reading_count >= 10 THEN ad.max_hr END
      ) AS max_hr,

      sb.reading_count AS sleep_reading_count,
      sb.rhr_value AS sleep_rhr_value,
      ad.reading_count AS all_day_reading_count,
      ad.rhr_value AS all_day_rhr_value
    FROM sleep_based_rhr sb
    CROSS JOIN all_day_rhr ad
  ),

  -- baseline per day: compute daily counts + percentile safely per day
  baseline_rhr_data AS (
    SELECT
      DATE_TRUNC('day', to_timestamp_seconds(CAST(hr.date AS BIGINT))) AS day_utc,
      COUNT(hr."singleHR") AS cnt,
      CASE WHEN COUNT(hr."singleHR") > 0
          THEN APPROX_PERCENTILE_CONT(hr."singleHR", 0.2)
          ELSE NULL END AS daily_rhr
    FROM heartrate hr
    WHERE to_timestamp_seconds(CAST(hr.date AS BIGINT))
      BETWEEN to_timestamp('2025-09-23T07:00:00') AND to_timestamp('2025-09-01T06:59:59')
      AND hr."singleHR" BETWEEN 40 AND 120
    GROUP BY DATE_TRUNC('day', to_timestamp_seconds(CAST(hr.date AS BIGINT)))
  ),

  baseline_rhr AS (
    SELECT
      AVG(daily_rhr) AS baseline_rhr_value,
      STDDEV(daily_rhr) AS baseline_rhr_stddev,
      COUNT(*) AS baseline_days_count
    FROM baseline_rhr_data
  ),

  rhr_analysis AS (
    SELECT
      tr.rhr_value AS today_rhr,
      tr.reading_count,
      tr.rhr_source,
      tr.avg_hr,
      tr.min_hr,
      tr.max_hr,
      tr.sleep_reading_count,
      tr.sleep_rhr_value,
      tr.all_day_reading_count,
      tr.all_day_rhr_value,
      br.baseline_rhr_value,
      br.baseline_rhr_stddev,
      br.baseline_days_count,
      ROUND(tr.rhr_value - br.baseline_rhr_value, 1) AS rhr_change_bpm,

      CASE
        WHEN tr.rhr_value IS NULL THEN 'No Data'
        WHEN br.baseline_days_count < 3 THEN 'Building Baseline'
        WHEN (tr.rhr_value - br.baseline_rhr_value) <= -3 THEN 'Improving'
        WHEN ABS(tr.rhr_value - br.baseline_rhr_value) <= 3 THEN 'Stable'
        WHEN (tr.rhr_value - br.baseline_rhr_value) <= 5 THEN 'Slightly Elevated'
        WHEN (tr.rhr_value - br.baseline_rhr_value) <= 10 THEN 'Elevated'
        ELSE 'High'
      END AS rhr_status,

      CASE
        WHEN tr.rhr_value IS NULL THEN NULL
        WHEN br.baseline_days_count < 3 THEN 75
        WHEN (tr.rhr_value - br.baseline_rhr_value) <= -3 THEN 100
        WHEN ABS(tr.rhr_value - br.baseline_rhr_value) <= 3 THEN 85
        WHEN (tr.rhr_value - br.baseline_rhr_value) <= 5 THEN 70
        WHEN (tr.rhr_value - br.baseline_rhr_value) <= 10 THEN 50
        ELSE 30
      END AS rhr_score
    FROM today_rhr tr
    CROSS JOIN baseline_rhr br
  )

  SELECT
    dp.target_date_utc AS date,
    ROUND(ra.today_rhr, 1) AS resting_heart_rate_bpm,
    ra.rhr_source AS calculation_method,
    ra.reading_count AS hr_readings_used,
    ROUND(ra.avg_hr, 1) AS average_heart_rate,
    ROUND(ra.min_hr, 1) AS minimum_heart_rate,
    ROUND(ra.max_hr, 1) AS maximum_heart_rate,
    ROUND(ra.sleep_rhr_value, 1) AS sleep_based_rhr,
    ra.sleep_reading_count AS sleep_hr_count,
    ROUND(ra.all_day_rhr_value, 1) AS all_day_rhr,
    ra.all_day_reading_count AS all_day_hr_count,
    ROUND(ra.baseline_rhr_value, 1) AS baseline_rhr_7day,
    ROUND(ra.baseline_rhr_stddev, 1) AS baseline_stddev,
    ra.baseline_days_count AS baseline_days,
    ra.rhr_change_bpm AS rhr_change,
    ra.rhr_status AS status,
    ra.rhr_score AS rhr_component_score,
    dp.sleep_window_start_utc,
    dp.sleep_window_end_utc,
    now() AS calculated_at_utc
  FROM date_params dp
  CROSS JOIN rhr_analysis ra;
  "#;
    let rhr_score_result = query("zivaring", rhr_sql_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "Resting Heart Rate Result: {} status: {} (Time taken: {:.3} seconds)",
      rhr_score_result["json_value"],
      rhr_score_result["status"],
      duration.as_secs_f64()
    );
  }

  Ok(())
}

#[allow(dead_code)]
async fn test_vitality_queries() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== VITALITY QUERIES COMPONENT ===");
  const STORAGE_PATH: &str = "tmp";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing Vitality queries for user: {} ===", username);
    let start_time = Instant::now();

    let vitality_sql_query = r#"
  WITH date_params AS (
      SELECT
          to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC' as now_utc,
          DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') as today_utc,
          DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '1 day' as yesterday_utc,
          DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '7 days' as week_ago,
          DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '14 days' as two_weeks_ago,
          DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '30 days' as month_ago,
          -- Sleep window: yesterday 6pm to today 6pm
          (DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') - INTERVAL '1 day') + INTERVAL '18 hours' as sleep_window_start,
          DATE_TRUNC('day', to_timestamp('2025-09-22T00:00:00') AT TIME ZONE 'UTC') + INTERVAL '18 hours' as sleep_window_end,
          65 as user_age
  ),

  hrv_component AS (
      WITH hrv_data_availability AS (
          SELECT
              COUNT(DISTINCT DATE_TRUNC('day', TO_TIMESTAMP(date))) as days_available
          FROM hrv_table
          WHERE TO_TIMESTAMP(date) >= (SELECT month_ago FROM date_params)
              AND hrv BETWEEN 1 AND 200
      ),

      progressive_baseline AS (
          SELECT
              da.days_available,
              CASE
                  WHEN da.days_available >= 30 THEN 30
                  WHEN da.days_available >= 14 THEN da.days_available
                  WHEN da.days_available >= 7 THEN da.days_available
                  WHEN da.days_available >= 3 THEN da.days_available
                  WHEN da.days_available >= 1 THEN da.days_available
                  ELSE 0
              END as baseline_days,

              CASE
                  WHEN da.days_available >= 30 THEN 1.0
                  WHEN da.days_available >= 14 THEN 0.9
                  WHEN da.days_available >= 7 THEN 0.75
                  WHEN da.days_available >= 3 THEN 0.5
                  WHEN da.days_available >= 1 THEN 0.3
                  ELSE 0.0
              END as confidence_factor,

              CASE
                  WHEN da.days_available >= 30 THEN 'Your recovery'
                  WHEN da.days_available >= 14 THEN 'Your recovery (personalizing)'
                  WHEN da.days_available >= 7 THEN 'Recovery (learning your pattern)'
                  WHEN da.days_available >= 3 THEN 'Early recovery data'
                  WHEN da.days_available >= 1 THEN 'Recovery: Calculating...'
                  ELSE 'No recovery data'
              END as confidence_message
          FROM hrv_data_availability da
      ),

      baseline_stats AS (
          SELECT
              pb.baseline_days,
              pb.confidence_factor,
              pb.confidence_message,
              AVG(h.hrv) as baseline_mean,
              STDDEV(h.hrv) as baseline_stddev
          FROM hrv_table h
          CROSS JOIN progressive_baseline pb
          WHERE TO_TIMESTAMP(h.date) >=
                (SELECT today_utc FROM date_params) - INTERVAL '30' day
              AND h.hrv BETWEEN 1 AND 200
          GROUP BY pb.baseline_days, pb.confidence_factor, pb.confidence_message
      ),

      today_hrv AS (
          SELECT
              AVG(hrv) as today_mean,
              AVG(stress) as today_stress,
              COUNT(*) as readings
          FROM hrv_table
          WHERE DATE_TRUNC('day', TO_TIMESTAMP(date)) =
                (SELECT today_utc FROM date_params)
              AND hrv BETWEEN 1 AND 200
      )
      
      SELECT 
          t.today_mean as current_hrv,
          t.readings as hrv_readings,
          b.baseline_mean,
          b.baseline_stddev,
          b.confidence_factor,
          b.confidence_message,
          t.today_stress,
          
          CASE 
              WHEN b.baseline_stddev > 0 AND t.today_mean IS NOT NULL THEN
                  ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor
              ELSE 0.0
          END as adjusted_z_score,
          
          CASE
              WHEN b.baseline_stddev = 0 OR t.today_mean IS NULL THEN NULL
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 1.0 
                  THEN 100.0
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.5 
                  THEN 85.0 + (((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor - 0.5) * 30.0
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.0 
                  THEN 70.0 + ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor * 30.0
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -0.5 
                  THEN 50.0 + (((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor + 0.5) * 40.0
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -1.0 
                  THEN 25.0 + (((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor + 1.0) * 50.0
              ELSE 25.0
          END as recovery_score_100,
          
          CASE
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 1.0 
                  THEN 'Excellent'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.5 
                  THEN 'Great'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.0 
                  THEN 'Good'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -0.5 
                  THEN 'Fair'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -1.0 
                  THEN 'Low'
              ELSE 'Poor'
          END as recovery_status,
          
          CASE
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 1.0 
                  THEN 'Your body is fully recovered'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.5 
                  THEN 'Strong recovery, ready for activity'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= 0.0 
                  THEN 'Normal recovery level'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -0.5 
                  THEN 'Adequate recovery'
              WHEN ((t.today_mean - b.baseline_mean) / b.baseline_stddev) * b.confidence_factor >= -1.0 
                  THEN 'Below normal recovery'
              ELSE 'Prioritize rest and recovery'
          END as recovery_message
          
      FROM today_hrv t
      CROSS JOIN baseline_stats b
  ),

  sleep_quality_component AS (
      WITH last_night_sleep AS (
          SELECT
              COUNT(*) as total_minutes,
              COUNT(DISTINCT start) as session_count,
              SUM(CASE WHEN quality = 3 THEN 1 ELSE 0 END) as deep_minutes,
              SUM(CASE WHEN quality = 5 THEN 1 ELSE 0 END) as rem_minutes,
              SUM(CASE WHEN quality = 2 THEN 1 ELSE 0 END) as light_minutes,
              SUM(CASE WHEN quality = 1 THEN 1 ELSE 0 END) as awake_minutes
          FROM sleep
          WHERE TO_TIMESTAMP(date) >=
                (SELECT sleep_window_start FROM date_params)
              AND TO_TIMESTAMP(date) <
                (SELECT sleep_window_end FROM date_params)
      ),
      
      sleep_scoring AS (
          SELECT 
              ls.*,
              dp.user_age,
              CASE 
                  WHEN dp.user_age >= 65 THEN 420.0
                  WHEN dp.user_age >= 50 THEN 390.0
                  ELSE 420.0
              END as min_optimal,
              CASE 
                  WHEN dp.user_age >= 65 THEN 540.0
                  WHEN dp.user_age >= 50 THEN 480.0
                  ELSE 510.0
              END as max_optimal
          FROM last_night_sleep ls
          CROSS JOIN date_params dp
      ),
      
      sleep_calculation AS (
          SELECT 
              total_minutes,
              session_count,
              deep_minutes,
              rem_minutes,
              light_minutes,
              awake_minutes,
              
              -- Duration scoring
              CASE 
                  WHEN total_minutes BETWEEN min_optimal AND max_optimal THEN 100.0
                  WHEN total_minutes < min_optimal THEN
                      GREATEST(25.0, 100.0 - ((min_optimal - total_minutes) * 0.5))
                  WHEN total_minutes <= 600 THEN 90.0
                  ELSE 75.0
              END as duration_score,
              
              -- Continuity scoring
              CASE 
                  WHEN session_count = 1 THEN 100.0
                  WHEN session_count = 2 THEN 85.0
                  WHEN session_count <= 4 THEN 70.0
                  WHEN session_count <= 6 THEN 55.0
                  ELSE 40.0
              END as continuity_score,
              
              -- Quality based on sleep architecture
              CASE 
                  WHEN total_minutes = 0 THEN 0.0
                  WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.25 THEN 100.0
                  WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.20 THEN 85.0
                  WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.15 THEN 70.0
                  WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.10 THEN 55.0
                  ELSE 40.0
              END as architecture_score,
              
              CASE 
                  WHEN total_minutes BETWEEN min_optimal AND max_optimal THEN 'Perfect sleep duration'
                  WHEN total_minutes < min_optimal - 60 THEN 'Significantly under-slept'
                  WHEN total_minutes < min_optimal THEN 'Less sleep than ideal'
                  WHEN total_minutes > 600 THEN 'Very long sleep - check energy levels'
                  WHEN total_minutes > max_optimal THEN 'Extended sleep detected'
                  ELSE 'Good sleep duration'
              END as duration_message,
              
              CASE 
                  WHEN session_count = 1 THEN 'Excellent sleep continuity'
                  WHEN session_count = 2 THEN 'Minimal interruptions'
                  WHEN session_count <= 4 THEN 'Some sleep fragmentation'
                  WHEN session_count <= 6 THEN 'Fragmented sleep'
                  ELSE 'Highly fragmented sleep'
              END as continuity_message,
              
              CASE 
                  WHEN total_minutes = 0 THEN 'No sleep data'
                  WHEN (deep_minutes + rem_minutes) * 1.0 / total_minutes >= 0.20 
                      THEN CONCAT('Good restorative sleep (', 
                          CAST(ROUND((deep_minutes + rem_minutes) * 100.0 / total_minutes, 0) AS VARCHAR),
                          '% deep+REM)')
                  ELSE CONCAT('Limited restorative sleep (', 
                      CAST(ROUND((deep_minutes + rem_minutes) * 100.0 / total_minutes, 0) AS VARCHAR),
                      '% deep+REM)')
              END as architecture_message,
              
              CASE 
                  WHEN total_minutes < min_optimal - 120 THEN 'very_short_sleep'
                  WHEN total_minutes < min_optimal - 60 THEN 'short_sleep'
                  WHEN total_minutes > 720 THEN 'excessive_sleep'
                  WHEN total_minutes > 600 THEN 'oversleep'
                  ELSE NULL
              END as sleep_alert
              
          FROM sleep_scoring
      )
      
      SELECT 
          total_minutes,
          session_count,
          deep_minutes,
          rem_minutes,
          light_minutes,
          awake_minutes,
          duration_score,
          continuity_score,
          architecture_score,
          duration_message,
          continuity_message,
          architecture_message,
          sleep_alert,
          (duration_score * 0.4 + continuity_score * 0.3 + architecture_score * 0.3) as sleep_quality_score_100
      FROM sleep_calculation
  ),

  sleep_consistency_component AS (
      WITH previous_nights AS (
          SELECT
              DATE_TRUNC('day', TO_TIMESTAMP(date)) as sleep_date,
              COUNT(*) as night_minutes,
              COUNT(DISTINCT start) as sessions
          FROM sleep
          WHERE TO_TIMESTAMP(date) >=
                (SELECT week_ago FROM date_params)
              AND TO_TIMESTAMP(date) <
                (SELECT sleep_window_start FROM date_params)
          GROUP BY DATE_TRUNC('day', TO_TIMESTAMP(date))
      ),
      
      consistency_stats AS (
          SELECT 
              COUNT(*) as nights_count,
              AVG(night_minutes) as avg_duration,
              STDDEV(night_minutes) as duration_variance
          FROM previous_nights
      ),
      
      consistency_scoring AS (
          SELECT 
              *,
              CASE 
                  WHEN avg_duration BETWEEN 420 AND 480 THEN 100.0
                  WHEN avg_duration BETWEEN 360 AND 540 THEN 80.0
                  ELSE 60.0
              END as duration_score,
              
              CASE 
                  WHEN duration_variance <= 30 THEN 100.0
                  WHEN duration_variance <= 60 THEN 75.0
                  ELSE 50.0
              END as variance_score,
              
              CASE 
                  WHEN duration_variance <= 30 
                      THEN 'Excellent sleep consistency this week'
                  WHEN duration_variance <= 60 
                      THEN 'Good sleep routine maintained'
                  WHEN duration_variance <= 90 
                      THEN 'Sleep schedule somewhat variable'
                  ELSE 'Irregular sleep pattern - try consistent bedtime'
              END as consistency_message
              
          FROM consistency_stats
      )
      
      SELECT 
          nights_count,
          avg_duration,
          duration_variance,
          consistency_message,
          CASE 
              WHEN nights_count >= 3 THEN 
                  (duration_score * 0.5 + variance_score * 0.5)
              ELSE NULL
          END as consistency_score_100,
          CASE 
              WHEN nights_count < 3 THEN 'building_sleep_patterns'
              ELSE NULL
          END as consistency_alert
      FROM consistency_scoring
  ),

  rhr_component AS (
      WITH daily_hr_readings AS (
          SELECT
              DATE_TRUNC('day', TO_TIMESTAMP(date)) as hr_date,
              "singleHR"
          FROM heartrate
          WHERE TO_TIMESTAMP(date) >=
                (SELECT week_ago FROM date_params)
              AND "singleHR" BETWEEN 40 AND 120
      ),

      daily_rhr AS (
          SELECT
              hr_date,
              APPROX_PERCENTILE_CONT("singleHR", 0.2) as rhr_20th
          FROM daily_hr_readings
          GROUP BY hr_date
      ),

      rhr_analysis AS (
          SELECT
              AVG(CASE WHEN hr_date < (SELECT today_utc FROM date_params)
                  THEN rhr_20th END) as baseline_rhr,
              MAX(CASE WHEN hr_date = (SELECT today_utc FROM date_params)
                  THEN rhr_20th END) as today_rhr
          FROM daily_rhr
      ),

      rhr_scoring AS (
          SELECT
              baseline_rhr,
              today_rhr,
              today_rhr - baseline_rhr as rhr_change,

              CASE
                  WHEN today_rhr - baseline_rhr <= -3 THEN 100.0
                  WHEN today_rhr - baseline_rhr <= 3 THEN 85.0
                  WHEN today_rhr - baseline_rhr <= 5 THEN 70.0
                  WHEN today_rhr - baseline_rhr <= 10 THEN 50.0
                  ELSE 30.0
              END as rhr_score_100,

              CASE
                  WHEN today_rhr - baseline_rhr <= -3 THEN 'Improving'
                  WHEN today_rhr - baseline_rhr <= 3 THEN 'Stable'
                  WHEN today_rhr - baseline_rhr <= 5 THEN 'Slightly Elevated'
                  WHEN today_rhr - baseline_rhr <= 10 THEN 'Elevated'
                  ELSE 'High'
              END as rhr_status,

              CASE
                  WHEN today_rhr - baseline_rhr <= -3 THEN 'Heart rate improving'
                  WHEN today_rhr - baseline_rhr <= 3 THEN 'Heart rate stable'
                  WHEN today_rhr - baseline_rhr <= 5 THEN 'Slightly elevated heart rate'
                  WHEN today_rhr - baseline_rhr <= 10 THEN 'Heart rate elevated - monitor'
                  ELSE 'Significant heart rate elevation'
              END as rhr_message,

              CASE
                  WHEN today_rhr - baseline_rhr > 10 THEN 'rhr_high_alert'
                  WHEN today_rhr - baseline_rhr > 5 THEN 'rhr_elevated'
                  ELSE NULL
              END as rhr_alert

          FROM rhr_analysis
      )

      SELECT * FROM rhr_scoring
  ),

  safety_modifiers AS (
      WITH temp_analysis AS (
          SELECT
              AVG(CASE WHEN DATE_TRUNC('day', TO_TIMESTAMP(date)) =
                  (SELECT today_utc FROM date_params)
                  THEN temperature END) as today_temp,
              AVG(CASE WHEN DATE_TRUNC('day', TO_TIMESTAMP(date)) <
                  (SELECT today_utc FROM date_params)
                  THEN temperature END) as baseline_temp
          FROM temperature_readings
          WHERE TO_TIMESTAMP(date) >=
                (SELECT two_weeks_ago FROM date_params)
              AND temperature BETWEEN 30 AND 40
      ),

      temp_scoring AS (
          SELECT
              today_temp - baseline_temp as temp_elevation,

              CASE
                  WHEN today_temp - baseline_temp > 1.5 THEN 50.0
                  WHEN today_temp - baseline_temp > 1.0 THEN 70.0
                  WHEN today_temp - baseline_temp > 0.5 THEN 85.0
                  ELSE 100.0
              END as temp_cap,

              CASE
                  WHEN today_temp - baseline_temp > 1.5 THEN 'critical'
                  WHEN today_temp - baseline_temp > 1.0 THEN 'high'
                  WHEN today_temp - baseline_temp > 0.5 THEN 'medium'
                  ELSE NULL
              END as temp_alert_level,

              CASE
                  WHEN today_temp - baseline_temp > 1.5
                      THEN 'Temperature elevated - possible illness'
                  WHEN today_temp - baseline_temp > 1.0
                      THEN 'Slight temperature elevation'
                  WHEN today_temp - baseline_temp > 0.5
                      THEN 'Minor temperature elevation'
                  ELSE NULL
              END as temp_message

          FROM temp_analysis
      ),

      spo2_analysis AS (
          SELECT
              AVG("automaticSpo2Data") as overnight_avg,
              COUNT(CASE WHEN "automaticSpo2Data" < 90 THEN 1 END) as dips_below_90
          FROM spo2
          WHERE DATE_TRUNC('day', TO_TIMESTAMP(date)) =
                (SELECT today_utc FROM date_params)
              AND "automaticSpo2Data" BETWEEN 70 AND 100
      ),
      
      spo2_scoring AS (
          SELECT 
              overnight_avg,
              dips_below_90,
              
              CASE 
                  WHEN overnight_avg < 88 THEN 40.0
                  WHEN overnight_avg < 92 THEN 60.0
                  WHEN dips_below_90 > 10 THEN 70.0
                  ELSE 100.0
              END as spo2_cap,
              
              CASE 
                  WHEN overnight_avg < 88 THEN 'critical'
                  WHEN overnight_avg < 92 THEN 'high'
                  WHEN dips_below_90 > 10 THEN 'medium'
                  ELSE NULL
              END as spo2_alert_level,
              
              CASE 
                  WHEN overnight_avg < 88 
                      THEN 'Low oxygen levels detected'
                  WHEN overnight_avg < 92 
                      THEN 'Oxygen levels below normal'
                  WHEN dips_below_90 > 10 
                      THEN 'Multiple oxygen dips during sleep'
                  ELSE NULL
              END as spo2_message
              
          FROM spo2_analysis
      )
      
      SELECT 
          t.temp_elevation,
          t.temp_cap,
          t.temp_alert_level,
          t.temp_message,
          s.overnight_avg as spo2_avg,
          s.dips_below_90,
          s.spo2_cap,
          s.spo2_alert_level,
          s.spo2_message
      FROM temp_scoring t
      CROSS JOIN spo2_scoring s
  ),

  vitality_calculation AS (
      SELECT 
          COALESCE(h.recovery_score_100, 0.0) as recovery_score,
          COALESCE(sq.sleep_quality_score_100, 0.0) as sleep_score,
          COALESCE(sc.consistency_score_100, 0.0) as consistency_score,
          COALESCE(r.rhr_score_100, 0.0) as rhr_score,
          
          CASE WHEN h.recovery_score_100 IS NOT NULL THEN 0.35 ELSE 0.0 END +
          CASE WHEN sq.sleep_quality_score_100 IS NOT NULL THEN 0.30 ELSE 0.0 END +
          CASE WHEN sc.consistency_score_100 IS NOT NULL THEN 0.20 ELSE 0.0 END +
          CASE WHEN r.rhr_score_100 IS NOT NULL THEN 0.15 ELSE 0.0 END as total_weight,
          
          -- Sleep details
          sq.total_minutes as sleep_minutes,
          sq.session_count as sleep_sessions,
          sq.deep_minutes,
          sq.rem_minutes,
          sq.light_minutes,
          sq.awake_minutes,
          
          -- HRV details
          h.hrv_readings,
          h.current_hrv,
          h.baseline_mean as baseline_hrv,
          h.adjusted_z_score as hrv_z_score,
          
          -- RHR details
          r.today_rhr,
          r.baseline_rhr,
          r.rhr_change,
          
          -- Component messages
          h.recovery_status,
          h.recovery_message,
          h.confidence_message,
          sq.duration_message,
          sq.continuity_message,
          sq.architecture_message,
          sq.sleep_alert,
          sc.consistency_message,
          sc.consistency_alert,
          r.rhr_status,
          r.rhr_message,
          r.rhr_alert,
          
          -- Safety
          sm.temp_cap,
          sm.temp_alert_level,
          sm.temp_message,
          sm.spo2_cap,
          sm.spo2_alert_level,
          sm.spo2_message,
          
          75.0 as yesterday_score
          
      FROM hrv_component h
      CROSS JOIN sleep_quality_component sq
      LEFT JOIN sleep_consistency_component sc ON true
      LEFT JOIN rhr_component r ON true
      CROSS JOIN safety_modifiers sm
  )

  SELECT 
      -- Vitality Score
      CAST(LEAST(
          CASE 
              WHEN total_weight > 0 THEN
                  ROUND((
                      recovery_score * 0.35 +
                      sleep_score * 0.30 +
                      consistency_score * 0.20 +
                      rhr_score * 0.15
                  ) / total_weight * 100.0)
              ELSE NULL
          END,
          temp_cap,
          spo2_cap
      ) AS INTEGER) as vitality_score,
      
      CASE 
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 85 THEN 'Excellent'
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 70 THEN 'Good'
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 55 THEN 'Fair'
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 40 THEN 'Low'
          ELSE 'Poor'
      END as vitality_category,
      
      CASE 
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 85 THEN 'Excellent vitality!'
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 70 THEN 'Good energy today'
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 55 THEN 'Moderate vitality'
          WHEN LEAST(
              ROUND((recovery_score * 0.35 + sleep_score * 0.30 + 
                    consistency_score * 0.20 + rhr_score * 0.15) / total_weight * 100.0),
              temp_cap, spo2_cap) >= 40 THEN 'Low energy today'
          ELSE 'Focus on recovery'
      END as primary_message,
      
      -- Component Breakdown
      CAST(ROUND(recovery_score, 1) AS DOUBLE) as recovery_score,
      CAST(ROUND(recovery_score * 0.35, 1) AS DOUBLE) as recovery_points,
      CAST(ROUND(sleep_score, 1) AS DOUBLE) as sleep_score,
      CAST(ROUND(sleep_score * 0.30, 1) AS DOUBLE) as sleep_points,
      CAST(ROUND(consistency_score, 1) AS DOUBLE) as consistency_score,
      CAST(ROUND(consistency_score * 0.20, 1) AS DOUBLE) as consistency_points,
      CAST(ROUND(rhr_score, 1) AS DOUBLE) as rhr_score,
      CAST(ROUND(rhr_score * 0.15, 1) AS DOUBLE) as rhr_points,
      
      -- Sleep Details
      sleep_minutes,
      CAST(ROUND(sleep_minutes / 60.0, 1) AS DOUBLE) as sleep_hours,
      sleep_sessions,
      deep_minutes,
      rem_minutes,
      light_minutes,
      awake_minutes,
      CASE WHEN sleep_minutes > 0 
          THEN CAST(ROUND((deep_minutes + rem_minutes) * 100.0 / sleep_minutes, 1) AS DOUBLE)
          ELSE 0.0 
      END as restorative_pct,
      
      -- HRV Details
      hrv_readings,
      CAST(ROUND(current_hrv, 1) AS DOUBLE) as current_hrv,
      CAST(ROUND(baseline_hrv, 1) AS DOUBLE) as baseline_hrv,
      CAST(ROUND(hrv_z_score, 2) AS DOUBLE) as hrv_z_score,
      
      -- RHR Details
      CAST(ROUND(today_rhr, 1) AS DOUBLE) as today_rhr,
      CAST(ROUND(baseline_rhr, 1) AS DOUBLE) as baseline_rhr,
      CAST(ROUND(rhr_change, 1) AS DOUBLE) as rhr_change,
      
      -- Messages
      recovery_status,
      recovery_message,
      confidence_message,
      duration_message as sleep_duration_message,
      continuity_message as sleep_continuity_message,
      architecture_message as sleep_architecture_message,
      consistency_message,
      rhr_status,
      rhr_message,
      
      -- Safety
      temp_message,
      spo2_message,
      
      -- Metadata
      CAST(ROUND(total_weight * 100, 0) AS INTEGER) as data_completeness_pct,
      CURRENT_TIMESTAMP AT TIME ZONE 'UTC' as calculated_at_utc
      
  FROM vitality_calculation;
  "#;

    let vitality_result = query("zivaring", vitality_sql_query, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "vitality Result: {} status: {} (Time taken: {:.3} seconds)",
      vitality_result["json_value"],
      vitality_result["status"],
      duration.as_secs_f64()
    );
  }

  Ok(())
}

#[allow(dead_code)]
async fn ziva_app_queries() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== ZIVA APP QUERIES COMPONENT ===");
  const STORAGE_PATH: &str = "tmp";
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let _ = init_timon(STORAGE_PATH, 43200, usernames[0]).unwrap();

  for username in &usernames {
    println!("\n=== Testing Ziva App queries for user: {} ===", username);
    let start_time = Instant::now();

    let query_x = r#"
  WITH transformed AS (
      SELECT 
        step,
        calories,
        TO_CHAR(TO_TIMESTAMP(date)::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles', '%Y.%m.%d %H:%M:%S') AS date,
        TO_CHAR(TO_TIMESTAMP(date)::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles', '%Y.%m.%d') AS day,
        TO_CHAR(TO_TIMESTAMP(date)::TIMESTAMP AT TIME ZONE 'UTC' AT TIME ZONE 'America/Los_Angeles', '%H') AS hour
      FROM activitydetails 
      WHERE date BETWEEN '1758499200' AND '1758585599'
    )
    SELECT 
      day, 
      hour, 
      MAX(date) AS date,
      SUM(calories) AS calories,
      SUM(step) AS value
    FROM transformed
    GROUP BY day, hour
    ORDER BY day, hour;
  "#;

    let result_x = query("zivaring", &query_x, None, None).await?;
    let duration = start_time.elapsed();
    println!(
      "result_x: {} status: {} (Time taken: {:.3} seconds)",
      result_x["json_value"],
      result_x["status"],
      duration.as_secs_f64()
    );
  }

  Ok(())
}

#[allow(dead_code)]
async fn ziva_username_query_matching() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== ZIVA USERNAME QUERY MATCHING COMPONENT ===");
  const STORAGE_PATH: &str = "tmp";
  const USERNAME: &str = "ahmed_test";
  let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();

  let query_x = "SELECT COUNT(*) as full_counter FROM spo2;";

  const USERNAME1: &str = "rADQkoFBr4Pks9Y2H_sriram";
  const USERNAME2: &str = "7TQBn6aSe49wfnuox_roshann";

  println!("\n--- Query with None (default path) ---");
  let result_x = query("zivaring", &query_x, None, None).await?;
  let count_none = result_x["json_value"]
    .as_array()
    .and_then(|arr| arr.get(0))
    .and_then(|obj| obj.get("full_counter"))
    .and_then(|v| v.as_u64())
    .unwrap_or(0);

  println!("\n--- Query with User1 ({}) ---", USERNAME1);
  let result_x1 = query("zivaring", &query_x, Some(USERNAME1), None).await?;
  let count_user1 = result_x1["json_value"]
    .as_array()
    .and_then(|arr| arr.get(0))
    .and_then(|obj| obj.get("full_counter"))
    .and_then(|v| v.as_u64())
    .unwrap_or(0);

  println!("\n--- Query with User2 ({}) ---", USERNAME2);
  let result_x2 = query("zivaring", &query_x, Some(USERNAME2), None).await?;
  let count_user2 = result_x2["json_value"]
    .as_array()
    .and_then(|arr| arr.get(0))
    .and_then(|obj| obj.get("full_counter"))
    .and_then(|v| v.as_u64())
    .unwrap_or(0);

  // Summary
  println!("\n--- Summary ---");
  println!("  None: {} rows", count_none);
  println!("  User1: {} rows", count_user1);
  println!("  User2: {} rows", count_user2);

  if count_none == count_user1 && count_user1 == count_user2 {
    println!("\n  ⚠️  WARNING: All queries returned the same count ({})!", count_none);
    println!("  ⚠️  This suggests they're all querying the SAME path (likely the default path).");
  } else {
    println!("\n  ✅ Different counts detected - queries are using different paths correctly.");
  }

  Ok(())
}

#[allow(dead_code)]
async fn check_ziva_fecth_time() -> Result<(), Box<dyn std::error::Error>> {
  println!("\n=== CHECK ZIVA FETCH TIME COMPONENT ===");

  const STORAGE_PATH: &str = "tmp";
  const USERNAME: &str = "ahmed_test";
  let _ = init_timon(STORAGE_PATH, 43200, USERNAME).unwrap();
  init_bucket(
    "https://s3.us-west-2.amazonaws.com",
    "zivaoneapp",
    "xxxxxxxxxx",
    "xxxxxxxxxx",
    "us-west-2",
  )
  .unwrap();

  let start_time = Instant::now();

  // Prepare batch fetch parameters
  let usernames = ["rADQkoFBr4Pks9Y2H_sriram", "7TQBn6aSe49wfnuox_roshann", "MtvcHGtLWZ23hS3KT_spalaniswamy"];
  let db_names = ["zivaring"];
  let table_names = [
    "activitydetails",
    "battery_table",
    "blood_glucose_table",
    "heartrate",
    "hrv_table",
    "sleep",
    "spo2",
    "temperature_table",
  ];
  let fetch_range = std::collections::HashMap::from([("start_date", "2025-01-01"), ("end_date", "2025-12-30")]);

  // for table_name in table_names {
  //   let result = cloud_sink_parquet("zivaring", table_name).await;
  //   match result {
  //     Ok(value) => {
  //       println!("Cloud sink parquet: {}", value);
  //     }
  //     Err(_) => {
  //       println!("Error in cloud sink parquet: {}", result.err().unwrap());
  //     }
  //   }
  // }

  // Use batch fetch for parallel execution
  let result = cloud_fetch_parquet_batch(&usernames, &db_names, &table_names, fetch_range).await;

  let end_time = Instant::now();
  let duration = end_time.duration_since(start_time);

  match result {
    Ok(value) => {
      let json_value: serde_json::Value = serde_json::from_str(&value.to_string()).unwrap_or(json!({}));
      let default_json = json!({});
      let json_data = json_value.get("json_value").unwrap_or(&default_json);
      let success_count = json_data.get("success_count").and_then(|v| v.as_u64()).unwrap_or(0);
      let error_count = json_data.get("error_count").and_then(|v| v.as_u64()).unwrap_or(0);
      let total_tasks = json_data.get("total_tasks").and_then(|v| v.as_u64()).unwrap_or(0);

      // Count files per user (approximate - each table typically has multiple parquet files)
      let files_per_user = table_names.len();

      let status_msg: String = if error_count == 0 {
        "No synchronization needed (all local files current)".to_string()
      } else {
        format!("{} successful, {} failed", success_count, error_count)
      };

      println!(
        "Cloud fetch performance:\nScope: {} users, {} tables, {} parquet files\nDistribution: {} ({} files), {} ({} files)\nExecution time: around {:.2} seconds\nStatus: {}",
        usernames.len(),
        table_names.len(),
        total_tasks,
        usernames[0],
        files_per_user,
        usernames[1],
        files_per_user,
        duration.as_secs_f64(),
        status_msg
      );
      if let Some(errors) = json_data.get("errors") {
        if errors.as_array().map(|a| !a.is_empty()).unwrap_or(false) {
          println!("Errors: {:?}", errors);
        }
      }
    }
    Err(e) => {
      eprintln!("Error in cloud_fetch_parquet_batch: {}", e);
      return Err(Box::new(std::io::Error::new(
        std::io::ErrorKind::Other,
        format!("cloud_fetch_parquet_batch failed: {}", e),
      )));
    }
  }

  Ok(())
}