Skip to main content

meerkat_mobkit/runtime/
session_store.rs

1//! Session store subsystem — persistence backends and session lifecycle operations.
2
3use super::*;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "snake_case")]
7pub enum SessionStoreKind {
8    BigQuery,
9    JsonFile,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct SessionStoreContract {
14    pub store: SessionStoreKind,
15    pub latest_row_per_session: bool,
16    pub tombstones_supported: bool,
17    pub dedup_read_path: bool,
18    pub file_locking: bool,
19    pub crash_recovery: bool,
20    pub bigquery_dataset: Option<String>,
21    pub bigquery_table: Option<String>,
22}
23
24#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
25pub struct SessionPersistenceRow {
26    #[serde(default)]
27    pub session_id: String,
28    #[serde(default)]
29    pub updated_at_ms: u64,
30    #[serde(default)]
31    pub deleted: bool,
32    #[serde(default)]
33    pub payload: Value,
34    #[serde(default)]
35    pub labels: BTreeMap<String, String>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct JsonStoreLockRecord {
40    pub owner_pid: u32,
41    pub created_at_ms: u64,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JsonFileSessionStoreError {
46    Io(String),
47    Serialize(String),
48    InvalidStoreData(String),
49    LockHeld { lock_path: String },
50    StaleLockRecoveryFailed(String),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct JsonFileSessionStore {
55    data_path: PathBuf,
56    lock_path: PathBuf,
57    stale_lock_threshold: Duration,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum BigQuerySessionStoreError {
62    Io(String),
63    Serialize(String),
64    Configuration(String),
65    Http(String),
66    Api(String),
67    InvalidQueryResponse(String),
68    ProcessFailed { command: String, stderr: String },
69}
70
71#[derive(Debug, Clone)]
72pub struct BigQuerySessionStoreAdapter {
73    dataset: String,
74    table: String,
75    project_id: Option<String>,
76    api_base_url: String,
77    access_token: Option<String>,
78    http_timeout: Duration,
79    client: reqwest::Client,
80}
81
82struct JsonFileLockGuard {
83    lock_path: PathBuf,
84}
85
86impl Drop for JsonFileLockGuard {
87    fn drop(&mut self) {
88        let _ = fs::remove_file(&self.lock_path);
89    }
90}
91
92pub fn session_store_contracts(decisions: &RuntimeDecisionState) -> Vec<SessionStoreContract> {
93    vec![
94        SessionStoreContract {
95            store: SessionStoreKind::BigQuery,
96            latest_row_per_session: true,
97            tombstones_supported: true,
98            dedup_read_path: true,
99            file_locking: false,
100            crash_recovery: false,
101            bigquery_dataset: Some(decisions.bigquery.dataset.clone()),
102            bigquery_table: Some(decisions.bigquery.table.clone()),
103        },
104        SessionStoreContract {
105            store: SessionStoreKind::JsonFile,
106            latest_row_per_session: true,
107            tombstones_supported: true,
108            dedup_read_path: true,
109            file_locking: true,
110            crash_recovery: true,
111            bigquery_dataset: None,
112            bigquery_table: None,
113        },
114    ]
115}
116
117pub fn materialize_latest_session_rows(
118    rows: &[SessionPersistenceRow],
119) -> Vec<SessionPersistenceRow> {
120    let mut latest_by_session: BTreeMap<String, SessionPersistenceRow> = BTreeMap::new();
121    for row in rows {
122        let should_replace = match latest_by_session.get(&row.session_id) {
123            Some(existing) => row.updated_at_ms >= existing.updated_at_ms,
124            None => true,
125        };
126        if should_replace {
127            latest_by_session.insert(row.session_id.clone(), row.clone());
128        }
129    }
130    latest_by_session.into_values().collect()
131}
132
133pub fn materialize_live_session_rows(rows: &[SessionPersistenceRow]) -> Vec<SessionPersistenceRow> {
134    materialize_latest_session_rows(rows)
135        .into_iter()
136        .filter(|row| !row.deleted)
137        .collect()
138}
139
140impl JsonFileSessionStore {
141    pub fn new(data_path: impl AsRef<Path>) -> Self {
142        let data_path = data_path.as_ref().to_path_buf();
143        let lock_path = data_path.with_extension("lock");
144        Self {
145            data_path,
146            lock_path,
147            stale_lock_threshold: Duration::from_secs(30),
148        }
149    }
150
151    pub fn with_lock_path(mut self, lock_path: impl AsRef<Path>) -> Self {
152        self.lock_path = lock_path.as_ref().to_path_buf();
153        self
154    }
155
156    pub fn with_stale_lock_threshold(mut self, threshold: Duration) -> Self {
157        self.stale_lock_threshold = threshold;
158        self
159    }
160
161    pub fn data_path(&self) -> &Path {
162        &self.data_path
163    }
164
165    pub fn lock_path(&self) -> &Path {
166        &self.lock_path
167    }
168
169    pub fn append_rows(
170        &self,
171        rows: &[SessionPersistenceRow],
172    ) -> Result<(), JsonFileSessionStoreError> {
173        let _guard = self.acquire_lock()?;
174        if let Some(parent) = self.data_path.parent() {
175            fs::create_dir_all(parent)
176                .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
177        }
178
179        let mut persisted = self.read_rows()?;
180        persisted.extend(rows.iter().cloned());
181
182        let tmp_path = self.data_path.with_extension("tmp");
183        let json = serde_json::to_vec_pretty(&persisted)
184            .map_err(|err| JsonFileSessionStoreError::Serialize(err.to_string()))?;
185        fs::write(&tmp_path, json).map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
186        fs::rename(&tmp_path, &self.data_path)
187            .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
188        Ok(())
189    }
190
191    pub fn read_rows(&self) -> Result<Vec<SessionPersistenceRow>, JsonFileSessionStoreError> {
192        if !self.data_path.exists() {
193            return Ok(vec![]);
194        }
195        let bytes = fs::read(&self.data_path)
196            .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
197        serde_json::from_slice::<Vec<SessionPersistenceRow>>(&bytes)
198            .map_err(|err| JsonFileSessionStoreError::InvalidStoreData(err.to_string()))
199    }
200
201    pub fn read_latest_rows(
202        &self,
203    ) -> Result<Vec<SessionPersistenceRow>, JsonFileSessionStoreError> {
204        let rows = self.read_rows()?;
205        Ok(materialize_latest_session_rows(&rows))
206    }
207
208    pub fn read_live_rows(&self) -> Result<Vec<SessionPersistenceRow>, JsonFileSessionStoreError> {
209        let rows = self.read_rows()?;
210        Ok(materialize_live_session_rows(&rows))
211    }
212
213    fn acquire_lock(&self) -> Result<JsonFileLockGuard, JsonFileSessionStoreError> {
214        if let Some(parent) = self.lock_path.parent() {
215            fs::create_dir_all(parent)
216                .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
217        }
218
219        let mut attempts = 0_u8;
220        loop {
221            attempts += 1;
222            match OpenOptions::new()
223                .create_new(true)
224                .write(true)
225                .open(&self.lock_path)
226            {
227                Ok(mut file) => {
228                    let lock_record = JsonStoreLockRecord {
229                        owner_pid: std::process::id(),
230                        created_at_ms: current_time_ms(),
231                    };
232                    let lock_bytes = serde_json::to_vec(&lock_record)
233                        .map_err(|err| JsonFileSessionStoreError::Serialize(err.to_string()))?;
234                    file.write_all(&lock_bytes)
235                        .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
236                    return Ok(JsonFileLockGuard {
237                        lock_path: self.lock_path.clone(),
238                    });
239                }
240                Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
241                    if attempts >= 2 {
242                        return Err(JsonFileSessionStoreError::LockHeld {
243                            lock_path: self.lock_path.display().to_string(),
244                        });
245                    }
246                    if self.is_lock_stale()? {
247                        fs::remove_file(&self.lock_path).map_err(|remove_err| {
248                            JsonFileSessionStoreError::StaleLockRecoveryFailed(
249                                remove_err.to_string(),
250                            )
251                        })?;
252                        continue;
253                    }
254                    return Err(JsonFileSessionStoreError::LockHeld {
255                        lock_path: self.lock_path.display().to_string(),
256                    });
257                }
258                Err(err) => return Err(JsonFileSessionStoreError::Io(err.to_string())),
259            }
260        }
261    }
262
263    fn is_lock_stale(&self) -> Result<bool, JsonFileSessionStoreError> {
264        let bytes = fs::read(&self.lock_path)
265            .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
266        let stale_threshold_ms = self.stale_lock_threshold.as_millis() as u64;
267        if let Ok(record) = serde_json::from_slice::<JsonStoreLockRecord>(&bytes) {
268            let age_ms = current_time_ms().saturating_sub(record.created_at_ms);
269            if age_ms < stale_threshold_ms {
270                return Ok(false);
271            }
272            return Ok(!is_process_alive(record.owner_pid));
273        }
274
275        let modified = fs::metadata(&self.lock_path)
276            .and_then(|meta| meta.modified())
277            .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
278        let age = SystemTime::now()
279            .duration_since(modified)
280            .unwrap_or_default();
281        Ok(age >= self.stale_lock_threshold)
282    }
283}
284
285fn is_process_alive(pid: u32) -> bool {
286    if pid == 0 {
287        return false;
288    }
289    let status = Command::new("kill")
290        .arg("-0")
291        .arg(pid.to_string())
292        .stdout(Stdio::null())
293        .stderr(Stdio::null())
294        .status();
295    match status {
296        Ok(exit_status) => exit_status.success(),
297        // If liveness probing is unavailable, avoid evicting potentially active locks.
298        Err(_) => true,
299    }
300}
301
302impl BigQuerySessionStoreAdapter {
303    pub const DEFAULT_API_BASE_URL: &'static str = "https://bigquery.googleapis.com/bigquery/v2";
304
305    pub fn new(
306        _legacy_bq_binary: impl AsRef<Path>,
307        dataset: impl Into<String>,
308        table: impl Into<String>,
309    ) -> Self {
310        Self::new_native(dataset, table)
311    }
312
313    pub fn new_native(dataset: impl Into<String>, table: impl Into<String>) -> Self {
314        let http_timeout = Duration::from_secs(30);
315        Self {
316            dataset: dataset.into(),
317            table: table.into(),
318            project_id: None,
319            api_base_url: Self::DEFAULT_API_BASE_URL.to_string(),
320            access_token: None,
321            client: reqwest::Client::builder()
322                .timeout(http_timeout)
323                .build()
324                .unwrap_or_default(),
325            http_timeout,
326        }
327    }
328
329    pub fn with_project_id(mut self, project_id: impl Into<String>) -> Self {
330        self.project_id = Some(project_id.into());
331        self
332    }
333
334    pub fn with_api_base_url(mut self, api_base_url: impl Into<String>) -> Self {
335        self.api_base_url = api_base_url.into();
336        self
337    }
338
339    pub fn with_access_token(mut self, access_token: impl Into<String>) -> Self {
340        self.access_token = Some(access_token.into());
341        self
342    }
343
344    pub fn with_http_timeout(mut self, timeout: Duration) -> Self {
345        self.http_timeout = timeout;
346        self.client = reqwest::Client::builder()
347            .timeout(timeout)
348            .build()
349            .unwrap_or_default();
350        self
351    }
352
353    pub fn with_bearer_token(self, access_token: impl Into<String>) -> Self {
354        self.with_access_token(access_token)
355    }
356
357    pub fn table_ref(&self) -> String {
358        format!("{}.{}", self.dataset, self.table)
359    }
360
361    pub async fn stream_insert_rows(
362        &self,
363        rows: &[SessionPersistenceRow],
364    ) -> Result<(), BigQuerySessionStoreError> {
365        if rows.is_empty() {
366            return Ok(());
367        }
368
369        let project_id = self.resolve_project_id()?;
370        let access_token = self.resolve_access_token()?;
371        let endpoint = format!(
372            "{}/projects/{project_id}/datasets/{}/tables/{}/insertAll",
373            self.api_base_url(),
374            self.dataset,
375            self.table
376        );
377
378        let mut request_rows = Vec::with_capacity(rows.len());
379        for row in rows {
380            let payload_json = serde_json::to_string(&row.payload)
381                .map_err(|err| BigQuerySessionStoreError::Serialize(err.to_string()))?;
382            let mut row_json = serde_json::json!({
383                "session_id": row.session_id,
384                "updated_at_ms": row.updated_at_ms.to_string(),
385                "deleted": row.deleted,
386                "payload": payload_json,
387            });
388            if !row.labels.is_empty() {
389                let labels_json = serde_json::to_string(&row.labels)
390                    .map_err(|err| BigQuerySessionStoreError::Serialize(err.to_string()))?;
391                row_json["labels_json"] = serde_json::Value::String(labels_json);
392            }
393            request_rows.push(serde_json::json!({ "json": row_json }));
394        }
395        let request = serde_json::json!({
396            "ignoreUnknownValues": false,
397            "skipInvalidRows": false,
398            "rows": request_rows,
399        });
400
401        let response = self
402            .send_json_request(
403                reqwest::Method::POST,
404                &endpoint,
405                &access_token,
406                Some(&request),
407            )
408            .await?;
409        if let Some(errors) = response.get("insertErrors").and_then(Value::as_array)
410            && !errors.is_empty()
411        {
412            let detail =
413                serde_json::to_string(errors).unwrap_or_else(|_| "<serialize_error>".to_string());
414            return Err(BigQuerySessionStoreError::Api(format!(
415                "BigQuery insertAll returned row errors: {detail}"
416            )));
417        }
418
419        Ok(())
420    }
421
422    pub async fn read_rows(&self) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
423        let fq_table = self.fully_qualified_table()?;
424        let query = format!(
425            "SELECT session_id, updated_at_ms, deleted, payload, labels_json FROM `{fq_table}` ORDER BY updated_at_ms ASC"
426        );
427        self.execute_query(&query).await
428    }
429
430    pub async fn read_latest_rows(
431        &self,
432    ) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
433        let fq_table = self.fully_qualified_table()?;
434        let query = format!(
435            "SELECT session_id, updated_at_ms, deleted, payload, labels_json \
436             FROM `{fq_table}` \
437             QUALIFY ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY updated_at_ms DESC) = 1"
438        );
439        self.execute_query(&query).await
440    }
441
442    pub async fn read_live_rows(
443        &self,
444    ) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
445        let fq_table = self.fully_qualified_table()?;
446        let query = format!(
447            "SELECT session_id, updated_at_ms, deleted, payload, labels_json FROM (\
448               SELECT session_id, updated_at_ms, deleted, payload, labels_json \
449               FROM `{fq_table}` \
450               QUALIFY ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY updated_at_ms DESC) = 1\
451             ) WHERE deleted = false"
452        );
453        self.execute_query(&query).await
454    }
455
456    fn fully_qualified_table(&self) -> Result<String, BigQuerySessionStoreError> {
457        let project_id = self.resolve_project_id()?;
458        Ok(format!("{}.{}", project_id, self.table_ref()))
459    }
460
461    async fn execute_query(
462        &self,
463        query: &str,
464    ) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
465        let project_id = self.resolve_project_id()?;
466        let access_token = self.resolve_access_token()?;
467        let endpoint = format!("{}/projects/{project_id}/queries", self.api_base_url());
468        let request = serde_json::json!({
469            "query": query,
470            "useLegacySql": false,
471            "maxResults": 10000,
472        });
473        let response = self
474            .send_json_request(
475                reqwest::Method::POST,
476                &endpoint,
477                &access_token,
478                Some(&request),
479            )
480            .await?;
481        parse_bigquery_query_rows(&response)
482    }
483
484    pub async fn gc_superseded_rows(&self) -> Result<u64, BigQuerySessionStoreError> {
485        let project_id = self.resolve_project_id()?;
486        let access_token = self.resolve_access_token()?;
487        let table_ref = self.table_ref();
488        let endpoint = format!("{}/projects/{project_id}/queries", self.api_base_url());
489        let query = format!(
490            "DELETE FROM `{project_id}.{table_ref}` AS t \
491             WHERE STRUCT(t.session_id, t.updated_at_ms) NOT IN ( \
492               SELECT AS STRUCT session_id, MAX(updated_at_ms) \
493               FROM `{project_id}.{table_ref}` \
494               GROUP BY session_id \
495             )"
496        );
497        let request = serde_json::json!({ "query": query, "useLegacySql": false });
498        let response = self
499            .send_json_request(
500                reqwest::Method::POST,
501                &endpoint,
502                &access_token,
503                Some(&request),
504            )
505            .await?;
506        let affected = response
507            .get("numDmlAffectedRows")
508            .and_then(|v| v.as_str())
509            .and_then(|s| s.parse::<u64>().ok())
510            .unwrap_or(0);
511        Ok(affected)
512    }
513
514    pub async fn truncate_sessions(&self) -> Result<(), BigQuerySessionStoreError> {
515        let project_id = self.resolve_project_id()?;
516        let access_token = self.resolve_access_token()?;
517        let table_ref = self.table_ref();
518        let endpoint = format!("{}/projects/{project_id}/queries", self.api_base_url());
519        let query = format!("TRUNCATE TABLE `{project_id}.{table_ref}`");
520        let request = serde_json::json!({ "query": query, "useLegacySql": false });
521        self.send_json_request(
522            reqwest::Method::POST,
523            &endpoint,
524            &access_token,
525            Some(&request),
526        )
527        .await?;
528        Ok(())
529    }
530
531    fn api_base_url(&self) -> &str {
532        self.api_base_url.trim_end_matches('/')
533    }
534
535    fn resolve_project_id(&self) -> Result<String, BigQuerySessionStoreError> {
536        if let Some(project_id) = self.project_id.as_deref() {
537            let project = project_id.trim();
538            if !project.is_empty() {
539                return Ok(project.to_string());
540            }
541        }
542
543        if let Ok(project_id) = std::env::var("BIGQUERY_PROJECT_ID") {
544            let project = project_id.trim();
545            if !project.is_empty() {
546                return Ok(project.to_string());
547            }
548        }
549
550        Err(BigQuerySessionStoreError::Configuration(
551            "missing BigQuery project_id: call with_project_id(...) or set BIGQUERY_PROJECT_ID"
552                .to_string(),
553        ))
554    }
555
556    fn resolve_access_token(&self) -> Result<String, BigQuerySessionStoreError> {
557        if let Some(token) = self.access_token.as_deref() {
558            let token = token.trim();
559            if !token.is_empty() {
560                return Ok(token.to_string());
561            }
562        }
563
564        for key in [
565            "BIGQUERY_ACCESS_TOKEN",
566            "GOOGLE_OAUTH_ACCESS_TOKEN",
567            "GOOGLE_ACCESS_TOKEN",
568        ] {
569            if let Ok(token) = std::env::var(key) {
570                let token = token.trim();
571                if !token.is_empty() {
572                    return Ok(token.to_string());
573                }
574            }
575        }
576
577        Err(BigQuerySessionStoreError::Configuration(
578            "missing BigQuery access token: call with_access_token(...) or set BIGQUERY_ACCESS_TOKEN"
579                .to_string(),
580        ))
581    }
582
583    async fn send_json_request(
584        &self,
585        method: reqwest::Method,
586        endpoint: &str,
587        access_token: &str,
588        body: Option<&Value>,
589    ) -> Result<Value, BigQuerySessionStoreError> {
590        let mut request = self
591            .client
592            .request(method, endpoint)
593            .bearer_auth(access_token)
594            .header("accept", "application/json");
595        if let Some(body) = body {
596            request = request
597                .header("content-type", "application/json")
598                .json(body);
599        }
600
601        let response = request
602            .send()
603            .await
604            .map_err(|err| BigQuerySessionStoreError::Http(format!("{err:?}")))?;
605        let status = response.status();
606        let text = response
607            .text()
608            .await
609            .map_err(|err| BigQuerySessionStoreError::Http(format!("{err:?}")))?;
610
611        if !status.is_success() {
612            return Err(BigQuerySessionStoreError::Api(format!(
613                "BigQuery API request failed (status {}): {}",
614                status.as_u16(),
615                text
616            )));
617        }
618
619        if text.trim().is_empty() {
620            return Ok(serde_json::json!({}));
621        }
622
623        serde_json::from_str::<Value>(&text)
624            .map_err(|err| BigQuerySessionStoreError::InvalidQueryResponse(err.to_string()))
625    }
626}
627
628#[derive(Debug, Clone)]
629pub struct BigQueryGcConfig {
630    pub interval: Duration,
631}
632
633impl Default for BigQueryGcConfig {
634    fn default() -> Self {
635        Self {
636            interval: Duration::from_hours(6),
637        }
638    }
639}
640
641/// Callback for GC error reporting. Receives the error message.
642pub type GcErrorCallback = std::sync::Arc<dyn Fn(String) + Send + Sync>;
643
644pub fn run_periodic_gc(
645    adapter: BigQuerySessionStoreAdapter,
646    config: BigQueryGcConfig,
647) -> impl FnOnce() {
648    run_periodic_gc_with_error_callback(adapter, config, None)
649}
650
651pub fn run_periodic_gc_with_error_callback(
652    adapter: BigQuerySessionStoreAdapter,
653    config: BigQueryGcConfig,
654    on_error: Option<GcErrorCallback>,
655) -> impl FnOnce() {
656    move || {
657        let rt = match tokio::runtime::Builder::new_current_thread()
658            .enable_all()
659            .build()
660        {
661            Ok(rt) => rt,
662            Err(err) => {
663                let msg = format!("failed to create async runtime for BQ GC: {err}");
664                eprintln!("[mobkit-gc] {msg}");
665                if let Some(ref cb) = on_error {
666                    cb(msg);
667                }
668                return;
669            }
670        };
671        loop {
672            std::thread::sleep(config.interval);
673            match rt.block_on(adapter.gc_superseded_rows()) {
674                Ok(deleted) => {
675                    if deleted > 0 {
676                        eprintln!("[mobkit-gc] deleted {deleted} superseded session rows");
677                    }
678                }
679                Err(err) => {
680                    let msg = format!("gc_superseded_rows failed: {err:?}");
681                    eprintln!("[mobkit-gc] {msg}");
682                    if let Some(ref cb) = on_error {
683                        cb(msg);
684                    }
685                }
686            }
687        }
688    }
689}
690
691fn parse_bigquery_query_rows(
692    response: &Value,
693) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
694    if response.is_array() {
695        return serde_json::from_value::<Vec<SessionPersistenceRow>>(response.clone())
696            .map_err(|err| BigQuerySessionStoreError::InvalidQueryResponse(err.to_string()));
697    }
698
699    let rows = response
700        .get("rows")
701        .and_then(Value::as_array)
702        .cloned()
703        .unwrap_or_default();
704    let mut parsed = Vec::with_capacity(rows.len());
705    for row in rows {
706        parsed.push(parse_bigquery_query_row(&row)?);
707    }
708
709    Ok(parsed)
710}
711
712fn parse_bigquery_query_row(
713    row: &Value,
714) -> Result<SessionPersistenceRow, BigQuerySessionStoreError> {
715    let fields = row.get("f").and_then(Value::as_array).ok_or_else(|| {
716        BigQuerySessionStoreError::InvalidQueryResponse(
717            "missing row.f cell array in query response".to_string(),
718        )
719    })?;
720    if fields.len() < 4 {
721        return Err(BigQuerySessionStoreError::InvalidQueryResponse(
722            "query response row has fewer than 4 columns".to_string(),
723        ));
724    }
725
726    let session_id = parse_bigquery_string_cell(&fields[0], "session_id")?;
727    let updated_at_ms = parse_bigquery_u64_cell(&fields[1], "updated_at_ms")?;
728    let deleted = parse_bigquery_bool_cell(&fields[2], "deleted")?;
729    let payload = parse_bigquery_payload_cell(&fields[3], "payload")?;
730    let labels = if fields.len() > 4 {
731        parse_bigquery_labels_cell(&fields[4])?
732    } else {
733        BTreeMap::new()
734    };
735
736    Ok(SessionPersistenceRow {
737        session_id,
738        updated_at_ms,
739        deleted,
740        payload,
741        labels,
742    })
743}
744
745fn parse_bigquery_string_cell(
746    cell: &Value,
747    column: &str,
748) -> Result<String, BigQuerySessionStoreError> {
749    let value = bigquery_cell_value(cell);
750    match value {
751        Value::String(s) => Ok(s.clone()),
752        _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
753            "query column {column} is not a string"
754        ))),
755    }
756}
757
758fn parse_bigquery_u64_cell(cell: &Value, column: &str) -> Result<u64, BigQuerySessionStoreError> {
759    let value = bigquery_cell_value(cell);
760    match value {
761        Value::Number(num) => num.as_u64().ok_or_else(|| {
762            BigQuerySessionStoreError::InvalidQueryResponse(format!(
763                "query column {column} is not a u64 number"
764            ))
765        }),
766        Value::String(s) => s.parse::<u64>().map_err(|_| {
767            BigQuerySessionStoreError::InvalidQueryResponse(format!(
768                "query column {column} is not a u64 string"
769            ))
770        }),
771        _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
772            "query column {column} is not a u64 value"
773        ))),
774    }
775}
776
777fn parse_bigquery_bool_cell(cell: &Value, column: &str) -> Result<bool, BigQuerySessionStoreError> {
778    let value = bigquery_cell_value(cell);
779    match value {
780        Value::Bool(flag) => Ok(*flag),
781        Value::String(s) => match s.as_str() {
782            "true" | "TRUE" | "1" => Ok(true),
783            "false" | "FALSE" | "0" => Ok(false),
784            _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
785                "query column {column} is not a bool string"
786            ))),
787        },
788        _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
789            "query column {column} is not a bool value"
790        ))),
791    }
792}
793
794fn parse_bigquery_payload_cell(
795    cell: &Value,
796    column: &str,
797) -> Result<Value, BigQuerySessionStoreError> {
798    let value = bigquery_cell_value(cell);
799    match value {
800        Value::Null => Ok(serde_json::json!({})),
801        Value::String(s) => {
802            if s.trim().is_empty() {
803                return Ok(serde_json::json!({}));
804            }
805            serde_json::from_str::<Value>(s).map_err(|_| {
806                BigQuerySessionStoreError::InvalidQueryResponse(format!(
807                    "query column {column} payload JSON parse failed"
808                ))
809            })
810        }
811        _ => Ok(value.clone()),
812    }
813}
814
815fn parse_bigquery_labels_cell(
816    cell: &Value,
817) -> Result<BTreeMap<String, String>, BigQuerySessionStoreError> {
818    let value = bigquery_cell_value(cell);
819    match value {
820        Value::Null => Ok(BTreeMap::new()),
821        Value::String(s) if s.trim().is_empty() => Ok(BTreeMap::new()),
822        Value::String(s) => serde_json::from_str::<BTreeMap<String, String>>(s).map_err(|_| {
823            BigQuerySessionStoreError::InvalidQueryResponse(
824                "query column labels_json parse failed".to_string(),
825            )
826        }),
827        _ => Ok(BTreeMap::new()),
828    }
829}
830
831fn bigquery_cell_value(cell: &Value) -> &Value {
832    cell.get("v").unwrap_or(cell)
833}