ig_client/storage/
utils.rs

1use crate::application::models::transaction::StoreTransaction;
2use crate::error::AppError;
3use crate::storage::config::DatabaseConfig;
4use serde::Serialize;
5use serde::de::DeserializeOwned;
6use serde_json;
7use sqlx::{Executor, PgPool};
8use tracing::info;
9
10/// Stores a list of transactions in the database
11///
12/// # Arguments
13/// * `pool` - PostgreSQL connection pool
14/// * `txs` - List of transactions to store
15///
16/// # Returns
17/// * `Result<usize, AppError>` - Number of transactions inserted or an error
18pub async fn store_transactions(
19    pool: &sqlx::PgPool,
20    txs: &[StoreTransaction],
21) -> Result<usize, AppError> {
22    let mut tx = pool.begin().await?;
23    let mut inserted = 0;
24
25    for t in txs {
26        let result = tx
27            .execute(
28                sqlx::query(
29                    r#"
30                    INSERT INTO ig_options (
31                        reference, deal_date, underlying, strike,
32                        option_type, expiry, transaction_type, pnl_eur, is_fee, raw
33                    )
34                    VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
35                    ON CONFLICT (raw_hash) DO NOTHING
36                    "#,
37                )
38                .bind(&t.reference)
39                .bind(t.deal_date)
40                .bind(&t.underlying)
41                .bind(t.strike)
42                .bind(&t.option_type)
43                .bind(t.expiry)
44                .bind(&t.transaction_type)
45                .bind(t.pnl_eur)
46                .bind(t.is_fee)
47                .bind(&t.raw_json),
48            )
49            .await?;
50
51        inserted += result.rows_affected() as usize;
52    }
53
54    tx.commit().await?;
55    Ok(inserted)
56}
57
58/// Serializes a value to a JSON string
59pub fn serialize_to_json<T: Serialize>(value: &T) -> Result<String, serde_json::Error> {
60    serde_json::to_string(value)
61}
62
63/// Deserializes a JSON string into a value
64pub fn deserialize_from_json<T: DeserializeOwned>(s: &str) -> Result<T, serde_json::Error> {
65    serde_json::from_str(s)
66}
67
68/// Creates a PostgreSQL connection pool from database configuration
69///
70/// # Arguments
71/// * `config` - Database configuration containing URL and max connections
72///
73/// # Returns
74/// * `Result<PgPool, AppError>` - Connection pool or an error
75pub async fn create_connection_pool(config: &DatabaseConfig) -> Result<PgPool, AppError> {
76    info!(
77        "Creating PostgreSQL connection pool with max {} connections",
78        config.max_connections
79    );
80
81    let pool = sqlx::postgres::PgPoolOptions::new()
82        .max_connections(config.max_connections)
83        .connect(&config.url)
84        .await
85        .map_err(AppError::Db)?;
86
87    info!("PostgreSQL connection pool created successfully");
88    Ok(pool)
89}
90
91/// Creates a database configuration from environment variables
92///
93/// # Returns
94/// * `Result<DatabaseConfig, AppError>` - Database configuration or an error
95pub fn create_database_config_from_env() -> Result<DatabaseConfig, AppError> {
96    let url = std::env::var("DATABASE_URL").map_err(|_| {
97        AppError::InvalidInput("DATABASE_URL environment variable is required".to_string())
98    })?;
99
100    let max_connections = std::env::var("DATABASE_MAX_CONNECTIONS")
101        .unwrap_or_else(|_| "10".to_string())
102        .parse::<u32>()
103        .map_err(|e| {
104            AppError::InvalidInput(format!("Invalid DATABASE_MAX_CONNECTIONS value: {e}"))
105        })?;
106
107    Ok(DatabaseConfig {
108        url,
109        max_connections,
110    })
111}