use anyhow::Result;
use chrono::{Duration, Utc};
use sqlx::PgPool;
use systemprompt_identifiers::{SessionId, UserId};
use super::types::CreateSessionParams;
pub async fn update_activity(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
r#"
UPDATE user_sessions
SET last_activity_at = CURRENT_TIMESTAMP,
duration_seconds = EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at))::INTEGER
WHERE session_id = $1
"#,
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn increment_request_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
r#"
UPDATE user_sessions
SET request_count = request_count + 1,
last_activity_at = CURRENT_TIMESTAMP,
duration_seconds = EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at))::INTEGER
WHERE session_id = $1
"#,
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn increment_task_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
"UPDATE user_sessions SET task_count = task_count + 1, last_activity_at = \
CURRENT_TIMESTAMP WHERE session_id = $1",
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn increment_ai_request_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
"UPDATE user_sessions SET ai_request_count = ai_request_count + 1, last_activity_at = \
CURRENT_TIMESTAMP WHERE session_id = $1",
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn increment_message_count(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
"UPDATE user_sessions SET message_count = message_count + 1, last_activity_at = \
CURRENT_TIMESTAMP WHERE session_id = $1",
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn end_session(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
r#"
UPDATE user_sessions
SET ended_at = CURRENT_TIMESTAMP,
duration_seconds = EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - started_at))::INTEGER
WHERE session_id = $1
"#,
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn mark_as_scanner(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
"UPDATE user_sessions SET is_scanner = true WHERE session_id = $1",
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn mark_converted(pool: &PgPool, session_id: &SessionId) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
"UPDATE user_sessions SET converted_at = CURRENT_TIMESTAMP WHERE session_id = $1 AND \
converted_at IS NULL",
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn cleanup_inactive(pool: &PgPool, inactive_hours: i32) -> Result<u64> {
let cutoff = Utc::now() - Duration::hours(i64::from(inactive_hours));
let result = sqlx::query!(
r#"
UPDATE user_sessions
SET ended_at = CURRENT_TIMESTAMP
WHERE ended_at IS NULL AND last_activity_at < $1
"#,
cutoff
)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
pub async fn migrate_user_sessions(
pool: &PgPool,
old_user_id: &UserId,
new_user_id: &UserId,
) -> Result<u64> {
let old_id = old_user_id.as_str();
let new_id = new_user_id.as_str();
let result = sqlx::query!(
"UPDATE user_sessions SET user_id = $1 WHERE user_id = $2",
new_id,
old_id
)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
pub async fn create_session(pool: &PgPool, params: &CreateSessionParams<'_>) -> Result<()> {
let session_id = params.session_id.as_str();
let user_id = params.user_id.map(UserId::as_str);
let session_source = params.session_source.as_str();
sqlx::query!(
r#"
INSERT INTO user_sessions (
session_id, user_id, session_source, fingerprint_hash, ip_address, user_agent,
device_type, browser, os, country, region, city, preferred_locale,
referrer_source, referrer_url, landing_page, entry_url,
utm_source, utm_medium, utm_campaign, is_bot, expires_at,
started_at, last_activity_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
"#,
session_id,
user_id,
session_source,
params.fingerprint_hash,
params.ip_address,
params.user_agent,
params.device_type,
params.browser,
params.os,
params.country,
params.region,
params.city,
params.preferred_locale,
params.referrer_source,
params.referrer_url,
params.landing_page,
params.entry_url,
params.utm_source,
params.utm_medium,
params.utm_campaign,
params.is_bot,
params.expires_at
)
.execute(pool)
.await?;
Ok(())
}
pub async fn increment_ai_usage(
pool: &PgPool,
session_id: &SessionId,
tokens: i32,
cost_microdollars: i64,
) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
r#"
UPDATE user_sessions
SET ai_request_count = COALESCE(ai_request_count, 0) + 1,
total_tokens_used = COALESCE(total_tokens_used, 0) + $1,
total_ai_cost_microdollars = COALESCE(total_ai_cost_microdollars, 0) + $2,
last_activity_at = CURRENT_TIMESTAMP
WHERE session_id = $3
"#,
tokens,
cost_microdollars,
id
)
.execute(pool)
.await?;
Ok(())
}
pub async fn escalate_throttle(
pool: &PgPool,
session_id: &SessionId,
new_level: i32,
) -> Result<()> {
let id = session_id.as_str();
sqlx::query!(
r#"
UPDATE user_sessions
SET throttle_level = $1,
throttle_escalated_at = CURRENT_TIMESTAMP
WHERE session_id = $2
"#,
new_level,
id
)
.execute(pool)
.await?;
Ok(())
}