auditor 0.10.2

AccoUnting Data handlIng Toolbox for Opportunistic Resources
Documentation
use crate::configuration::ArchivalConfig;
use crate::configuration::CompressionType;
use crate::domain::Record;
use anyhow;
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow_array::RecordBatch;
use chrono::{DateTime, Datelike, Utc};
use chrono::{Months, TimeZone};
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::basic::{Compression, GzipLevel};
use parquet::file::properties::WriterProperties;
use sqlx::FromRow;
use sqlx::{PgPool, Row};
use std::fs::File;
use std::ops::Add;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::info;

#[derive(Debug, Clone)]
pub struct ArchiveService {
    pub pool: PgPool,
    pub config: ArchivalConfig,
}

impl ArchiveService {
    pub fn new(pool: PgPool, config: ArchivalConfig) -> Self {
        Self { pool, config }
    }

    pub async fn start_scheduler(&self) -> anyhow::Result<()> {
        let scheduler = JobScheduler::new().await?;

        let pool = self.pool.clone();
        let config = self.config.clone();

        let job_factory = move |_uuid, _lock| {
            let pool = pool.clone();
            let config = config.clone();
            Box::pin(async move {
                info!("Started scheduled archival process");
                match Self::archive_old_records(pool, config).await {
                    Ok(_) => println!("Successfully archived records"),
                    Err(e) => {
                        println!("Archival process failed. Check the logs for more info {e}",)
                    }
                }
            }) as std::pin::Pin<Box<dyn futures_util::Future<Output = ()> + Send>>
        };

        let oneshot =
            Job::new_one_shot_async(std::time::Duration::from_secs(1), job_factory.clone())?;
        let cronjob = Job::new_async(self.config.cron_schedule.clone().as_str(), job_factory)?;

        scheduler.add(oneshot).await?;
        scheduler.add(cronjob).await?;
        scheduler.start().await?;

        Ok(())
    }

    async fn archive_old_records(pool: PgPool, config: ArchivalConfig) -> anyhow::Result<()> {
        let schema = Arc::new(Schema::new(vec![
            Field::new("record_id", DataType::Utf8, false),
            Field::new("meta", DataType::Utf8, false),
            Field::new("components", DataType::Utf8, false),
            Field::new(
                "start_time",
                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
                false,
            ),
            Field::new(
                "stop_time",
                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
                false,
            ),
            Field::new("runtime", DataType::Int64, false),
        ]));

        let archive_period = Months::new(u32::try_from(config.archive_older_than_months)?);
        let current_timestamp = Utc::now();

        let cutoff_timestamp = current_timestamp
            .checked_sub_months(archive_period)
            .expect("Error while constructing cutoff_timestamp for archiving");

        let cutoff_month = cutoff_timestamp.month();
        let cutoff_year = cutoff_timestamp.year();

        let cutoff_timestamp_lower_bound = Utc
            .with_ymd_and_hms(cutoff_year, cutoff_month, 1, 0, 0, 0)
            .unwrap();

        #[derive(Debug, FromRow)]
        struct StopTime {
            stop_time: DateTime<Utc>,
        }
        let oldest_record_timestamp_in_db = sqlx::query_as::<_, StopTime>(
            r"SELECT stop_time FROM auditor_accounting ORDER BY stop_time ASC limit 1;",
        )
        .fetch_optional(&pool)
        .await?;

        let earliest_timestamp = match oldest_record_timestamp_in_db {
            Some(ref record_timestamp) => {
                println!("Oldest record timestamp stop_time: {record_timestamp:?}");
                record_timestamp.stop_time
            }
            None => {
                println!(
                    "The database is empty. No records found in auditor_accounting for archival."
                );
                return Ok(());
            }
        };

        let mut year = earliest_timestamp.year();
        let mut month = earliest_timestamp.month();
        let mut month_to_be_archived_lower_bound =
            Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).unwrap();
        let archive_interval = Months::new(1);
        let mut month_to_be_archived_upper_bound = month_to_be_archived_lower_bound
            .checked_add_months(archive_interval)
            .unwrap();

        while month_to_be_archived_lower_bound < cutoff_timestamp_lower_bound
            && month_to_be_archived_upper_bound <= cutoff_timestamp_lower_bound
        {
            year = month_to_be_archived_lower_bound.year();
            month = month_to_be_archived_lower_bound.month();

            let archive_filename = format!(
                "{}_{}_{}.parquet",
                config.archive_file_prefix, &year, &month,
            );

            let a = month_to_be_archived_lower_bound.to_rfc3339();
            let b = month_to_be_archived_upper_bound.to_rfc3339();

            let archive_dir = Path::new(&config.archive_path);

            if !archive_dir.exists() {
                println!(
                    "Directory does not exist. Creating new directory at {archive_dir:?} (as specified in config.archive_path)"
                );
                if let Err(e) = std::fs::create_dir_all(archive_dir) {
                    eprintln!("Failed to create directory: {e}");
                }
            }

            let path = archive_dir.join(&archive_filename);

            let mut writer: Option<ArrowWriter<File>> = None;
            let mut total_archived = 0i64;
            let mut offset = 0i64;

            loop {
                let records_sql = sqlx::query(
                        r"SELECT record_id,
                                  meta,
                                  components,
                                  start_time,
                                  stop_time,
                                  runtime
                           FROM auditor_accounting WHERE stop_time >= $1::timestamptz AND stop_time < $2::timestamptz ORDER BY id
                                LIMIT $3 OFFSET $4"
                    ).bind(&a).bind(&b).bind(1000000)
                            .bind(offset).fetch_all(&pool).await?;

                if records_sql.is_empty() {
                    break;
                }

                if writer.is_none() {
                    let file = File::create(&path)?;

                    let props = match config.compression_type {
                        CompressionType::Gzip => WriterProperties::builder()
                            .set_compression(Compression::GZIP(GzipLevel::default()))
                            .build(),
                        CompressionType::Snappy => WriterProperties::builder()
                            .set_compression(Compression::SNAPPY)
                            .build(),
                    };

                    writer = Some(ArrowWriter::try_new(file, schema.clone(), Some(props))?);
                }

                let batch_count = records_sql.len() as i64;

                let records: Vec<Record> = records_sql
                    .iter()
                    .map(|row| Record {
                        record_id: row.try_get("record_id").unwrap(),
                        meta: row
                            .try_get("meta")
                            .ok()
                            .and_then(|value| serde_json::from_value(value).ok()),
                        components: row
                            .try_get("components")
                            .ok()
                            .and_then(|value| serde_json::from_value(value).ok()),
                        start_time: row.try_get("start_time").ok().unwrap_or(None),
                        stop_time: row.try_get("stop_time").ok().unwrap_or(None),
                        runtime: row.try_get("runtime").ok().unwrap_or(None),
                    })
                    .collect();

                // Convert data to Arrow arrays
                let record_ids: Vec<String> = records.iter().map(|r| r.record_id.clone()).collect();
                let metas: Vec<String> = records
                    .iter()
                    .map(|r| serde_json::to_string(&r.meta).unwrap())
                    .collect();
                let components: Vec<String> = records
                    .iter()
                    .map(|r| serde_json::to_string(&r.components).unwrap())
                    .collect();
                let start_times: Vec<i64> = records
                    .iter()
                    .map(|r| r.start_time.expect("None value").timestamp_millis())
                    .collect();
                let stop_times: Vec<i64> = records
                    .iter()
                    .map(|r| r.stop_time.expect("None value").timestamp_millis())
                    .collect();
                let runtimes: Vec<i64> = records.iter().map(|r| r.runtime.expect("None")).collect();

                let batch = RecordBatch::try_new(
                    schema.clone(),
                    vec![
                        Arc::new(StringArray::from(record_ids)),
                        Arc::new(StringArray::from(metas)),
                        Arc::new(StringArray::from(components)),
                        Arc::new(TimestampMillisecondArray::from(start_times).with_timezone("UTC")),
                        Arc::new(TimestampMillisecondArray::from(stop_times).with_timezone("UTC")),
                        Arc::new(Int64Array::from(runtimes)),
                    ],
                )?;

                writer
                    .as_mut()
                    .expect("Writer should be initialized")
                    .write(&batch)?;

                total_archived += batch_count;
                offset += 1000000;

                info!(
                    "Archived batch of {} records to {}",
                    batch_count, archive_filename
                );

                writer
                    .as_mut()
                    .expect("Writer should be initialized")
                    .flush()?;
            }

            if let Some(w) = writer {
                w.close()?;

                info!("Total records archived: {}", total_archived);

                let validated_data = data_validation(&path, pool.clone(), &a, &b).await;

                match validated_data {
                    Ok(record_count) => {
                        println!("Validation is successful for {:?}", &path);

                        deletion_from_db(&a, &b, record_count, pool.clone()).await?;
                    }
                    Err(e) => {
                        println!("{e}. Aborting deletion of data for month/year ");
                    }
                }
            } else {
                info!(
                    "No records found for period {} to {}, skipping file creation",
                    a, b
                );
            }

            month_to_be_archived_lower_bound = month_to_be_archived_lower_bound
                .checked_add_months(archive_interval)
                .unwrap();

            month_to_be_archived_upper_bound = month_to_be_archived_upper_bound
                .checked_add_months(archive_interval)
                .unwrap();
        }

        Ok(())
    }
}

async fn deletion_from_db(
    a: &String,
    b: &String,
    record_count: i64,
    pool: PgPool,
) -> anyhow::Result<()> {
    let mut delete_count = 0i64;

    loop {
        let result = sqlx::query(
            r"WITH to_delete AS (
    SELECT *
    FROM auditor_accounting
    WHERE stop_time >= $1::timestamptz
      AND stop_time <  $2::timestamptz
    LIMIT 1000
)
DELETE FROM auditor_accounting
USING to_delete
WHERE auditor_accounting.id = to_delete.id;",
        )
        .bind(a)
        .bind(b)
        .execute(&pool)
        .await?
        .rows_affected();

        delete_count += i64::try_from(result)?;

        if result == 0 && delete_count == record_count {
            println!("Record deletion is successful. count --> {delete_count}");
            break;
        }

        if delete_count > record_count {
            println!(
                "Something went wrong while deletion. Please check the data for the month from this timestamp --> {a} "
            );
            break;
        }
    }

    Ok(())
}

async fn data_validation(
    parquet_file_path: &PathBuf,
    pool: PgPool,
    a: &String,
    b: &String,
) -> anyhow::Result<i64> {
    println!("checking file path {:?}", &parquet_file_path);
    let file = File::open(parquet_file_path)?;
    let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file)?;

    let parquet_metadata = arrow_reader.metadata();

    let num_rows = parquet_metadata.file_metadata().num_rows();

    println!("Number of rows in '{:?}': {}", &parquet_file_path, num_rows);

    let mut record_count: usize = 0;

    for batch in arrow_reader.build()? {
        record_count = batch?.num_rows().add(record_count);
    }

    println!("record count --> {record_count}");

    let converted_count: i64 = record_count.try_into().expect("Conversion failed");

    let row = sqlx::query("SELECT COUNT(*) AS count FROM auditor_accounting WHERE stop_time >= $1::timestamptz and stop_time < $2::timestamptz")
            .bind(a)
            .bind(b)
            .fetch_one(&pool)
            .await?;

    let count: i64 = row.get("count");

    println!("count_query_from_db --> {:?}", &count);

    if converted_count == count {
        Ok(count)
    } else {
        Err(anyhow::anyhow!(
            "The record count in {:?} --> {} doesn't match the database {}. Suspending archival service",
            parquet_file_path,
            converted_count,
            count
        ))
    }
}