1use super::*;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "snake_case")]
7pub enum SessionStoreKind {
8 BigQuery,
9 JsonFile,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub struct SessionStoreContract {
14 pub store: SessionStoreKind,
15 pub latest_row_per_session: bool,
16 pub tombstones_supported: bool,
17 pub dedup_read_path: bool,
18 pub file_locking: bool,
19 pub crash_recovery: bool,
20 pub bigquery_dataset: Option<String>,
21 pub bigquery_table: Option<String>,
22}
23
24#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
25pub struct SessionPersistenceRow {
26 #[serde(default)]
27 pub session_id: String,
28 #[serde(default)]
29 pub updated_at_ms: u64,
30 #[serde(default)]
31 pub deleted: bool,
32 #[serde(default)]
33 pub payload: Value,
34 #[serde(default)]
35 pub labels: BTreeMap<String, String>,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct JsonStoreLockRecord {
40 pub owner_pid: u32,
41 pub created_at_ms: u64,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum JsonFileSessionStoreError {
46 Io(String),
47 Serialize(String),
48 InvalidStoreData(String),
49 LockHeld { lock_path: String },
50 StaleLockRecoveryFailed(String),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct JsonFileSessionStore {
55 data_path: PathBuf,
56 lock_path: PathBuf,
57 stale_lock_threshold: Duration,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum BigQuerySessionStoreError {
62 Io(String),
63 Serialize(String),
64 Configuration(String),
65 Http(String),
66 Api(String),
67 InvalidQueryResponse(String),
68 ProcessFailed { command: String, stderr: String },
69}
70
71#[derive(Debug, Clone)]
72pub struct BigQuerySessionStoreAdapter {
73 dataset: String,
74 table: String,
75 project_id: Option<String>,
76 api_base_url: String,
77 access_token: Option<String>,
78 http_timeout: Duration,
79 client: reqwest::Client,
80}
81
82struct JsonFileLockGuard {
83 lock_path: PathBuf,
84}
85
86impl Drop for JsonFileLockGuard {
87 fn drop(&mut self) {
88 let _ = fs::remove_file(&self.lock_path);
89 }
90}
91
92pub fn session_store_contracts(decisions: &RuntimeDecisionState) -> Vec<SessionStoreContract> {
93 vec![
94 SessionStoreContract {
95 store: SessionStoreKind::BigQuery,
96 latest_row_per_session: true,
97 tombstones_supported: true,
98 dedup_read_path: true,
99 file_locking: false,
100 crash_recovery: false,
101 bigquery_dataset: Some(decisions.bigquery.dataset.clone()),
102 bigquery_table: Some(decisions.bigquery.table.clone()),
103 },
104 SessionStoreContract {
105 store: SessionStoreKind::JsonFile,
106 latest_row_per_session: true,
107 tombstones_supported: true,
108 dedup_read_path: true,
109 file_locking: true,
110 crash_recovery: true,
111 bigquery_dataset: None,
112 bigquery_table: None,
113 },
114 ]
115}
116
117pub fn materialize_latest_session_rows(
118 rows: &[SessionPersistenceRow],
119) -> Vec<SessionPersistenceRow> {
120 let mut latest_by_session: BTreeMap<String, SessionPersistenceRow> = BTreeMap::new();
121 for row in rows {
122 let should_replace = match latest_by_session.get(&row.session_id) {
123 Some(existing) => row.updated_at_ms >= existing.updated_at_ms,
124 None => true,
125 };
126 if should_replace {
127 latest_by_session.insert(row.session_id.clone(), row.clone());
128 }
129 }
130 latest_by_session.into_values().collect()
131}
132
133pub fn materialize_live_session_rows(rows: &[SessionPersistenceRow]) -> Vec<SessionPersistenceRow> {
134 materialize_latest_session_rows(rows)
135 .into_iter()
136 .filter(|row| !row.deleted)
137 .collect()
138}
139
140impl JsonFileSessionStore {
141 pub fn new(data_path: impl AsRef<Path>) -> Self {
142 let data_path = data_path.as_ref().to_path_buf();
143 let lock_path = data_path.with_extension("lock");
144 Self {
145 data_path,
146 lock_path,
147 stale_lock_threshold: Duration::from_secs(30),
148 }
149 }
150
151 pub fn with_lock_path(mut self, lock_path: impl AsRef<Path>) -> Self {
152 self.lock_path = lock_path.as_ref().to_path_buf();
153 self
154 }
155
156 pub fn with_stale_lock_threshold(mut self, threshold: Duration) -> Self {
157 self.stale_lock_threshold = threshold;
158 self
159 }
160
161 pub fn data_path(&self) -> &Path {
162 &self.data_path
163 }
164
165 pub fn lock_path(&self) -> &Path {
166 &self.lock_path
167 }
168
169 pub fn append_rows(
170 &self,
171 rows: &[SessionPersistenceRow],
172 ) -> Result<(), JsonFileSessionStoreError> {
173 let _guard = self.acquire_lock()?;
174 if let Some(parent) = self.data_path.parent() {
175 fs::create_dir_all(parent)
176 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
177 }
178
179 let mut persisted = self.read_rows()?;
180 persisted.extend(rows.iter().cloned());
181
182 let tmp_path = self.data_path.with_extension("tmp");
183 let json = serde_json::to_vec_pretty(&persisted)
184 .map_err(|err| JsonFileSessionStoreError::Serialize(err.to_string()))?;
185 fs::write(&tmp_path, json).map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
186 fs::rename(&tmp_path, &self.data_path)
187 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
188 Ok(())
189 }
190
191 pub fn read_rows(&self) -> Result<Vec<SessionPersistenceRow>, JsonFileSessionStoreError> {
192 if !self.data_path.exists() {
193 return Ok(vec![]);
194 }
195 let bytes = fs::read(&self.data_path)
196 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
197 serde_json::from_slice::<Vec<SessionPersistenceRow>>(&bytes)
198 .map_err(|err| JsonFileSessionStoreError::InvalidStoreData(err.to_string()))
199 }
200
201 pub fn read_latest_rows(
202 &self,
203 ) -> Result<Vec<SessionPersistenceRow>, JsonFileSessionStoreError> {
204 let rows = self.read_rows()?;
205 Ok(materialize_latest_session_rows(&rows))
206 }
207
208 pub fn read_live_rows(&self) -> Result<Vec<SessionPersistenceRow>, JsonFileSessionStoreError> {
209 let rows = self.read_rows()?;
210 Ok(materialize_live_session_rows(&rows))
211 }
212
213 fn acquire_lock(&self) -> Result<JsonFileLockGuard, JsonFileSessionStoreError> {
214 if let Some(parent) = self.lock_path.parent() {
215 fs::create_dir_all(parent)
216 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
217 }
218
219 let mut attempts = 0_u8;
220 loop {
221 attempts += 1;
222 match OpenOptions::new()
223 .create_new(true)
224 .write(true)
225 .open(&self.lock_path)
226 {
227 Ok(mut file) => {
228 let lock_record = JsonStoreLockRecord {
229 owner_pid: std::process::id(),
230 created_at_ms: current_time_ms(),
231 };
232 let lock_bytes = serde_json::to_vec(&lock_record)
233 .map_err(|err| JsonFileSessionStoreError::Serialize(err.to_string()))?;
234 file.write_all(&lock_bytes)
235 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
236 return Ok(JsonFileLockGuard {
237 lock_path: self.lock_path.clone(),
238 });
239 }
240 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
241 if attempts >= 2 {
242 return Err(JsonFileSessionStoreError::LockHeld {
243 lock_path: self.lock_path.display().to_string(),
244 });
245 }
246 if self.is_lock_stale()? {
247 fs::remove_file(&self.lock_path).map_err(|remove_err| {
248 JsonFileSessionStoreError::StaleLockRecoveryFailed(
249 remove_err.to_string(),
250 )
251 })?;
252 continue;
253 }
254 return Err(JsonFileSessionStoreError::LockHeld {
255 lock_path: self.lock_path.display().to_string(),
256 });
257 }
258 Err(err) => return Err(JsonFileSessionStoreError::Io(err.to_string())),
259 }
260 }
261 }
262
263 fn is_lock_stale(&self) -> Result<bool, JsonFileSessionStoreError> {
264 let bytes = fs::read(&self.lock_path)
265 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
266 let stale_threshold_ms = self.stale_lock_threshold.as_millis() as u64;
267 if let Ok(record) = serde_json::from_slice::<JsonStoreLockRecord>(&bytes) {
268 let age_ms = current_time_ms().saturating_sub(record.created_at_ms);
269 if age_ms < stale_threshold_ms {
270 return Ok(false);
271 }
272 return Ok(!is_process_alive(record.owner_pid));
273 }
274
275 let modified = fs::metadata(&self.lock_path)
276 .and_then(|meta| meta.modified())
277 .map_err(|err| JsonFileSessionStoreError::Io(err.to_string()))?;
278 let age = SystemTime::now()
279 .duration_since(modified)
280 .unwrap_or_default();
281 Ok(age >= self.stale_lock_threshold)
282 }
283}
284
285fn is_process_alive(pid: u32) -> bool {
286 if pid == 0 {
287 return false;
288 }
289 let status = Command::new("kill")
290 .arg("-0")
291 .arg(pid.to_string())
292 .stdout(Stdio::null())
293 .stderr(Stdio::null())
294 .status();
295 match status {
296 Ok(exit_status) => exit_status.success(),
297 Err(_) => true,
299 }
300}
301
302impl BigQuerySessionStoreAdapter {
303 pub const DEFAULT_API_BASE_URL: &'static str = "https://bigquery.googleapis.com/bigquery/v2";
304
305 pub fn new(
306 _legacy_bq_binary: impl AsRef<Path>,
307 dataset: impl Into<String>,
308 table: impl Into<String>,
309 ) -> Self {
310 Self::new_native(dataset, table)
311 }
312
313 pub fn new_native(dataset: impl Into<String>, table: impl Into<String>) -> Self {
314 let http_timeout = Duration::from_secs(30);
315 Self {
316 dataset: dataset.into(),
317 table: table.into(),
318 project_id: None,
319 api_base_url: Self::DEFAULT_API_BASE_URL.to_string(),
320 access_token: None,
321 client: reqwest::Client::builder()
322 .timeout(http_timeout)
323 .build()
324 .unwrap_or_default(),
325 http_timeout,
326 }
327 }
328
329 pub fn with_project_id(mut self, project_id: impl Into<String>) -> Self {
330 self.project_id = Some(project_id.into());
331 self
332 }
333
334 pub fn with_api_base_url(mut self, api_base_url: impl Into<String>) -> Self {
335 self.api_base_url = api_base_url.into();
336 self
337 }
338
339 pub fn with_access_token(mut self, access_token: impl Into<String>) -> Self {
340 self.access_token = Some(access_token.into());
341 self
342 }
343
344 pub fn with_http_timeout(mut self, timeout: Duration) -> Self {
345 self.http_timeout = timeout;
346 self.client = reqwest::Client::builder()
347 .timeout(timeout)
348 .build()
349 .unwrap_or_default();
350 self
351 }
352
353 pub fn with_bearer_token(self, access_token: impl Into<String>) -> Self {
354 self.with_access_token(access_token)
355 }
356
357 pub fn table_ref(&self) -> String {
358 format!("{}.{}", self.dataset, self.table)
359 }
360
361 pub async fn stream_insert_rows(
362 &self,
363 rows: &[SessionPersistenceRow],
364 ) -> Result<(), BigQuerySessionStoreError> {
365 if rows.is_empty() {
366 return Ok(());
367 }
368
369 let project_id = self.resolve_project_id()?;
370 let access_token = self.resolve_access_token()?;
371 let endpoint = format!(
372 "{}/projects/{project_id}/datasets/{}/tables/{}/insertAll",
373 self.api_base_url(),
374 self.dataset,
375 self.table
376 );
377
378 let mut request_rows = Vec::with_capacity(rows.len());
379 for row in rows {
380 let payload_json = serde_json::to_string(&row.payload)
381 .map_err(|err| BigQuerySessionStoreError::Serialize(err.to_string()))?;
382 let mut row_json = serde_json::json!({
383 "session_id": row.session_id,
384 "updated_at_ms": row.updated_at_ms.to_string(),
385 "deleted": row.deleted,
386 "payload": payload_json,
387 });
388 if !row.labels.is_empty() {
389 let labels_json = serde_json::to_string(&row.labels)
390 .map_err(|err| BigQuerySessionStoreError::Serialize(err.to_string()))?;
391 row_json["labels_json"] = serde_json::Value::String(labels_json);
392 }
393 request_rows.push(serde_json::json!({ "json": row_json }));
394 }
395 let request = serde_json::json!({
396 "ignoreUnknownValues": false,
397 "skipInvalidRows": false,
398 "rows": request_rows,
399 });
400
401 let response = self
402 .send_json_request(
403 reqwest::Method::POST,
404 &endpoint,
405 &access_token,
406 Some(&request),
407 )
408 .await?;
409 if let Some(errors) = response.get("insertErrors").and_then(Value::as_array)
410 && !errors.is_empty()
411 {
412 let detail =
413 serde_json::to_string(errors).unwrap_or_else(|_| "<serialize_error>".to_string());
414 return Err(BigQuerySessionStoreError::Api(format!(
415 "BigQuery insertAll returned row errors: {detail}"
416 )));
417 }
418
419 Ok(())
420 }
421
422 pub async fn read_rows(&self) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
423 let fq_table = self.fully_qualified_table()?;
424 let query = format!(
425 "SELECT session_id, updated_at_ms, deleted, payload, labels_json FROM `{fq_table}` ORDER BY updated_at_ms ASC"
426 );
427 self.execute_query(&query).await
428 }
429
430 pub async fn read_latest_rows(
431 &self,
432 ) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
433 let fq_table = self.fully_qualified_table()?;
434 let query = format!(
435 "SELECT session_id, updated_at_ms, deleted, payload, labels_json \
436 FROM `{fq_table}` \
437 QUALIFY ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY updated_at_ms DESC) = 1"
438 );
439 self.execute_query(&query).await
440 }
441
442 pub async fn read_live_rows(
443 &self,
444 ) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
445 let fq_table = self.fully_qualified_table()?;
446 let query = format!(
447 "SELECT session_id, updated_at_ms, deleted, payload, labels_json FROM (\
448 SELECT session_id, updated_at_ms, deleted, payload, labels_json \
449 FROM `{fq_table}` \
450 QUALIFY ROW_NUMBER() OVER (PARTITION BY session_id ORDER BY updated_at_ms DESC) = 1\
451 ) WHERE deleted = false"
452 );
453 self.execute_query(&query).await
454 }
455
456 fn fully_qualified_table(&self) -> Result<String, BigQuerySessionStoreError> {
457 let project_id = self.resolve_project_id()?;
458 Ok(format!("{}.{}", project_id, self.table_ref()))
459 }
460
461 async fn execute_query(
462 &self,
463 query: &str,
464 ) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
465 let project_id = self.resolve_project_id()?;
466 let access_token = self.resolve_access_token()?;
467 let endpoint = format!("{}/projects/{project_id}/queries", self.api_base_url());
468 let request = serde_json::json!({
469 "query": query,
470 "useLegacySql": false,
471 "maxResults": 10000,
472 });
473 let response = self
474 .send_json_request(
475 reqwest::Method::POST,
476 &endpoint,
477 &access_token,
478 Some(&request),
479 )
480 .await?;
481 parse_bigquery_query_rows(&response)
482 }
483
484 pub async fn gc_superseded_rows(&self) -> Result<u64, BigQuerySessionStoreError> {
485 let project_id = self.resolve_project_id()?;
486 let access_token = self.resolve_access_token()?;
487 let table_ref = self.table_ref();
488 let endpoint = format!("{}/projects/{project_id}/queries", self.api_base_url());
489 let query = format!(
490 "DELETE FROM `{project_id}.{table_ref}` AS t \
491 WHERE STRUCT(t.session_id, t.updated_at_ms) NOT IN ( \
492 SELECT AS STRUCT session_id, MAX(updated_at_ms) \
493 FROM `{project_id}.{table_ref}` \
494 GROUP BY session_id \
495 )"
496 );
497 let request = serde_json::json!({ "query": query, "useLegacySql": false });
498 let response = self
499 .send_json_request(
500 reqwest::Method::POST,
501 &endpoint,
502 &access_token,
503 Some(&request),
504 )
505 .await?;
506 let affected = response
507 .get("numDmlAffectedRows")
508 .and_then(|v| v.as_str())
509 .and_then(|s| s.parse::<u64>().ok())
510 .unwrap_or(0);
511 Ok(affected)
512 }
513
514 pub async fn truncate_sessions(&self) -> Result<(), BigQuerySessionStoreError> {
515 let project_id = self.resolve_project_id()?;
516 let access_token = self.resolve_access_token()?;
517 let table_ref = self.table_ref();
518 let endpoint = format!("{}/projects/{project_id}/queries", self.api_base_url());
519 let query = format!("TRUNCATE TABLE `{project_id}.{table_ref}`");
520 let request = serde_json::json!({ "query": query, "useLegacySql": false });
521 self.send_json_request(
522 reqwest::Method::POST,
523 &endpoint,
524 &access_token,
525 Some(&request),
526 )
527 .await?;
528 Ok(())
529 }
530
531 fn api_base_url(&self) -> &str {
532 self.api_base_url.trim_end_matches('/')
533 }
534
535 fn resolve_project_id(&self) -> Result<String, BigQuerySessionStoreError> {
536 if let Some(project_id) = self.project_id.as_deref() {
537 let project = project_id.trim();
538 if !project.is_empty() {
539 return Ok(project.to_string());
540 }
541 }
542
543 if let Ok(project_id) = std::env::var("BIGQUERY_PROJECT_ID") {
544 let project = project_id.trim();
545 if !project.is_empty() {
546 return Ok(project.to_string());
547 }
548 }
549
550 Err(BigQuerySessionStoreError::Configuration(
551 "missing BigQuery project_id: call with_project_id(...) or set BIGQUERY_PROJECT_ID"
552 .to_string(),
553 ))
554 }
555
556 fn resolve_access_token(&self) -> Result<String, BigQuerySessionStoreError> {
557 if let Some(token) = self.access_token.as_deref() {
558 let token = token.trim();
559 if !token.is_empty() {
560 return Ok(token.to_string());
561 }
562 }
563
564 for key in [
565 "BIGQUERY_ACCESS_TOKEN",
566 "GOOGLE_OAUTH_ACCESS_TOKEN",
567 "GOOGLE_ACCESS_TOKEN",
568 ] {
569 if let Ok(token) = std::env::var(key) {
570 let token = token.trim();
571 if !token.is_empty() {
572 return Ok(token.to_string());
573 }
574 }
575 }
576
577 Err(BigQuerySessionStoreError::Configuration(
578 "missing BigQuery access token: call with_access_token(...) or set BIGQUERY_ACCESS_TOKEN"
579 .to_string(),
580 ))
581 }
582
583 async fn send_json_request(
584 &self,
585 method: reqwest::Method,
586 endpoint: &str,
587 access_token: &str,
588 body: Option<&Value>,
589 ) -> Result<Value, BigQuerySessionStoreError> {
590 let mut request = self
591 .client
592 .request(method, endpoint)
593 .bearer_auth(access_token)
594 .header("accept", "application/json");
595 if let Some(body) = body {
596 request = request
597 .header("content-type", "application/json")
598 .json(body);
599 }
600
601 let response = request
602 .send()
603 .await
604 .map_err(|err| BigQuerySessionStoreError::Http(format!("{err:?}")))?;
605 let status = response.status();
606 let text = response
607 .text()
608 .await
609 .map_err(|err| BigQuerySessionStoreError::Http(format!("{err:?}")))?;
610
611 if !status.is_success() {
612 return Err(BigQuerySessionStoreError::Api(format!(
613 "BigQuery API request failed (status {}): {}",
614 status.as_u16(),
615 text
616 )));
617 }
618
619 if text.trim().is_empty() {
620 return Ok(serde_json::json!({}));
621 }
622
623 serde_json::from_str::<Value>(&text)
624 .map_err(|err| BigQuerySessionStoreError::InvalidQueryResponse(err.to_string()))
625 }
626}
627
628#[derive(Debug, Clone)]
629pub struct BigQueryGcConfig {
630 pub interval: Duration,
631}
632
633impl Default for BigQueryGcConfig {
634 fn default() -> Self {
635 Self {
636 interval: Duration::from_hours(6),
637 }
638 }
639}
640
641pub type GcErrorCallback = std::sync::Arc<dyn Fn(String) + Send + Sync>;
643
644pub fn run_periodic_gc(
645 adapter: BigQuerySessionStoreAdapter,
646 config: BigQueryGcConfig,
647) -> impl FnOnce() {
648 run_periodic_gc_with_error_callback(adapter, config, None)
649}
650
651pub fn run_periodic_gc_with_error_callback(
652 adapter: BigQuerySessionStoreAdapter,
653 config: BigQueryGcConfig,
654 on_error: Option<GcErrorCallback>,
655) -> impl FnOnce() {
656 move || {
657 let rt = match tokio::runtime::Builder::new_current_thread()
658 .enable_all()
659 .build()
660 {
661 Ok(rt) => rt,
662 Err(err) => {
663 let msg = format!("failed to create async runtime for BQ GC: {err}");
664 eprintln!("[mobkit-gc] {msg}");
665 if let Some(ref cb) = on_error {
666 cb(msg);
667 }
668 return;
669 }
670 };
671 loop {
672 std::thread::sleep(config.interval);
673 match rt.block_on(adapter.gc_superseded_rows()) {
674 Ok(deleted) => {
675 if deleted > 0 {
676 eprintln!("[mobkit-gc] deleted {deleted} superseded session rows");
677 }
678 }
679 Err(err) => {
680 let msg = format!("gc_superseded_rows failed: {err:?}");
681 eprintln!("[mobkit-gc] {msg}");
682 if let Some(ref cb) = on_error {
683 cb(msg);
684 }
685 }
686 }
687 }
688 }
689}
690
691fn parse_bigquery_query_rows(
692 response: &Value,
693) -> Result<Vec<SessionPersistenceRow>, BigQuerySessionStoreError> {
694 if response.is_array() {
695 return serde_json::from_value::<Vec<SessionPersistenceRow>>(response.clone())
696 .map_err(|err| BigQuerySessionStoreError::InvalidQueryResponse(err.to_string()));
697 }
698
699 let rows = response
700 .get("rows")
701 .and_then(Value::as_array)
702 .cloned()
703 .unwrap_or_default();
704 let mut parsed = Vec::with_capacity(rows.len());
705 for row in rows {
706 parsed.push(parse_bigquery_query_row(&row)?);
707 }
708
709 Ok(parsed)
710}
711
712fn parse_bigquery_query_row(
713 row: &Value,
714) -> Result<SessionPersistenceRow, BigQuerySessionStoreError> {
715 let fields = row.get("f").and_then(Value::as_array).ok_or_else(|| {
716 BigQuerySessionStoreError::InvalidQueryResponse(
717 "missing row.f cell array in query response".to_string(),
718 )
719 })?;
720 if fields.len() < 4 {
721 return Err(BigQuerySessionStoreError::InvalidQueryResponse(
722 "query response row has fewer than 4 columns".to_string(),
723 ));
724 }
725
726 let session_id = parse_bigquery_string_cell(&fields[0], "session_id")?;
727 let updated_at_ms = parse_bigquery_u64_cell(&fields[1], "updated_at_ms")?;
728 let deleted = parse_bigquery_bool_cell(&fields[2], "deleted")?;
729 let payload = parse_bigquery_payload_cell(&fields[3], "payload")?;
730 let labels = if fields.len() > 4 {
731 parse_bigquery_labels_cell(&fields[4])?
732 } else {
733 BTreeMap::new()
734 };
735
736 Ok(SessionPersistenceRow {
737 session_id,
738 updated_at_ms,
739 deleted,
740 payload,
741 labels,
742 })
743}
744
745fn parse_bigquery_string_cell(
746 cell: &Value,
747 column: &str,
748) -> Result<String, BigQuerySessionStoreError> {
749 let value = bigquery_cell_value(cell);
750 match value {
751 Value::String(s) => Ok(s.clone()),
752 _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
753 "query column {column} is not a string"
754 ))),
755 }
756}
757
758fn parse_bigquery_u64_cell(cell: &Value, column: &str) -> Result<u64, BigQuerySessionStoreError> {
759 let value = bigquery_cell_value(cell);
760 match value {
761 Value::Number(num) => num.as_u64().ok_or_else(|| {
762 BigQuerySessionStoreError::InvalidQueryResponse(format!(
763 "query column {column} is not a u64 number"
764 ))
765 }),
766 Value::String(s) => s.parse::<u64>().map_err(|_| {
767 BigQuerySessionStoreError::InvalidQueryResponse(format!(
768 "query column {column} is not a u64 string"
769 ))
770 }),
771 _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
772 "query column {column} is not a u64 value"
773 ))),
774 }
775}
776
777fn parse_bigquery_bool_cell(cell: &Value, column: &str) -> Result<bool, BigQuerySessionStoreError> {
778 let value = bigquery_cell_value(cell);
779 match value {
780 Value::Bool(flag) => Ok(*flag),
781 Value::String(s) => match s.as_str() {
782 "true" | "TRUE" | "1" => Ok(true),
783 "false" | "FALSE" | "0" => Ok(false),
784 _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
785 "query column {column} is not a bool string"
786 ))),
787 },
788 _ => Err(BigQuerySessionStoreError::InvalidQueryResponse(format!(
789 "query column {column} is not a bool value"
790 ))),
791 }
792}
793
794fn parse_bigquery_payload_cell(
795 cell: &Value,
796 column: &str,
797) -> Result<Value, BigQuerySessionStoreError> {
798 let value = bigquery_cell_value(cell);
799 match value {
800 Value::Null => Ok(serde_json::json!({})),
801 Value::String(s) => {
802 if s.trim().is_empty() {
803 return Ok(serde_json::json!({}));
804 }
805 serde_json::from_str::<Value>(s).map_err(|_| {
806 BigQuerySessionStoreError::InvalidQueryResponse(format!(
807 "query column {column} payload JSON parse failed"
808 ))
809 })
810 }
811 _ => Ok(value.clone()),
812 }
813}
814
815fn parse_bigquery_labels_cell(
816 cell: &Value,
817) -> Result<BTreeMap<String, String>, BigQuerySessionStoreError> {
818 let value = bigquery_cell_value(cell);
819 match value {
820 Value::Null => Ok(BTreeMap::new()),
821 Value::String(s) if s.trim().is_empty() => Ok(BTreeMap::new()),
822 Value::String(s) => serde_json::from_str::<BTreeMap<String, String>>(s).map_err(|_| {
823 BigQuerySessionStoreError::InvalidQueryResponse(
824 "query column labels_json parse failed".to_string(),
825 )
826 }),
827 _ => Ok(BTreeMap::new()),
828 }
829}
830
831fn bigquery_cell_value(cell: &Value) -> &Value {
832 cell.get("v").unwrap_or(cell)
833}