sfm-sdk 0.2.28

A simple SDK for reusing SFM entities.
Documentation
use bigtable_rs::bigtable::{BigTable, Error};
use bigtable_rs::google::bigtable::v2::mutate_rows_request::Entry;
use bigtable_rs::google::bigtable::v2::MutateRowsRequest;
use std::time::Duration;
use tonic::{Code, Status};
use tracing::{error, info, warn};

use crate::utils::backoff::obtain_next_interval;

fn process_bt_status(
    status: Status,
    retry_limit: &usize,
    mut retry_count: usize,
    task_table_name: &str,
) -> usize {
    match status.code() {
        // Do nothing.
        Code::NotFound | Code::Ok | Code::OutOfRange => {}
        Code::ResourceExhausted => {
            warn!("BigTable resources are currently exhausted. Let's wait");
        }
        Code::Unknown => {
            error!(
                "There was an issue ingesting {} data to BigTable. {:?}",
                task_table_name, status
            );
            retry_count += 1;
        }
        Code::Cancelled => {
            warn!("BigTable request cancelled on our side. Trying again");
        }
        Code::InvalidArgument => {
            error!(
                "Invalid Argument for ingestion {} data to BigTable. {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
        Code::DeadlineExceeded => {
            warn!(
                "BigTable DeadlineExceeded. Let's wait; Data ingestion \
                                    might be too overwhelming at the moment."
            );
        }
        Code::AlreadyExists => {
            error!(
                "Row/s already exists for {} data. {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
        Code::PermissionDenied => {
            error!(
                "Invalid Permissions for ingesting {} data to \
                                    BigTable. {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
        Code::FailedPrecondition => {
            warn!("BigTable failed precondition. Let's wait and try again.");
        }
        Code::Aborted => {
            warn!("BigTable request aborted. Let's wait and try again.");
        }
        Code::Unimplemented => {
            error!(
                "Unimplemented API.. | table: {} | reason: {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
        Code::Internal => {
            error!(
                "BigTable internal error. Report!! | table: {} | \
                                    reason: {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
        Code::Unavailable => {
            warn!("BigTable resources are currently unavailable. Let's wait");
        }
        Code::DataLoss => {
            error!(
                "BigTable DataLoss while ingesting {} data to \
                                    BigTable. {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
        Code::Unauthenticated => {
            error!(
                "Invalid Authentication for ingesting {} data to \
                                    BigTable. {:?}",
                task_table_name, status
            );
            retry_count = retry_limit.clone();
        }
    }

    if retry_count.eq(retry_limit) {
        error!("Ditching this job.");
    }

    retry_count
}

pub async fn ingest_entries(
    bt_client: &mut BigTable,
    table_name: String,
    table_prefix: String,
    mutations_per_entry: usize,
    entries: Vec<Entry>,
    retry_limit: &usize,
    retry_interval: &Duration,
) {
    let mut bt_tasks: Vec<_> = Vec::new();

    // Chunk by 100k mutations, via mutations_per_entry
    let chunks = entries.chunks(100000 / mutations_per_entry);
    let chunk_count = chunks.len();
    for (idx, chunk) in chunks.enumerate() {
        let mut chunk_rows = chunk.to_owned();
        let mut client = bt_client.clone();
        let mut retry_count = 0;
        let task_table_name = table_name.clone();
        let task_table_prefix = table_prefix.clone();
        let task_retry_limit = retry_limit.clone();
        let task_retry_interval = retry_interval.clone();
        let task_chunk_count = chunk_count.clone();

        info!("Writing to BT");
        info!("{:?}", task_table_name);

        bt_tasks.push(tokio::spawn(async move {
            loop {
                let push_result = client
                    .mutate_rows(MutateRowsRequest {
                        table_name: client.get_full_table_name(&task_table_name),
                        entries: chunk_rows.clone(),
                        ..MutateRowsRequest::default()
                    })
                    .await;

                if let Err(err) = push_result {
                    // error!("There was an issue ingesting {} data to BigTable. {:?}", task_table_name, err);

                    // Check varying types of errors and only increment retry_count if bad.
                    match err {
                        Error::AccessTokenError(error) => {
                            error!(
                                "AccessTokenError for ingesting {} data to BigTable. {:?}",
                                task_table_name, error
                            );
                            retry_count = task_retry_limit;
                        }
                        Error::CertificateError(error) => {
                            error!(
                                "CertificateError for ingesting {} data to BigTable. {:?}",
                                task_table_name, error
                            );
                            retry_count = task_retry_limit;
                        }
                        Error::IoError(error) => {
                            error!(
                                "IoError for ingesting {} data to BigTable. {:?}",
                                task_table_name, error
                            );
                            retry_count += 1;
                        }
                        Error::TransportError(tonic_tpt_err) => {
                            error!(
                                "TransportError while ingesting {} data to BigTable. {:?}",
                                task_table_name, tonic_tpt_err
                            );
                            retry_count += 1;
                        }
                        Error::ChunkError(error) => {
                            error!(
                                "ChunkError for ingesting {} data to BigTable. {:?}",
                                task_table_name, error
                            );
                            retry_count = task_retry_limit;
                        }
                        Error::RowWriteFailed => {
                            error!(
                                "RowWriteFailed while ingesting {} data to BigTable. {:?}",
                                task_table_name, err
                            );
                            retry_count += 1;
                        }
                        Error::RpcError(tonic_status) => {
                            retry_count = process_bt_status(
                                tonic_status,
                                &task_retry_limit,
                                task_chunk_count,
                                task_table_prefix.as_str(),
                            );
                        }
                        Error::TimeoutError(seconds) => {
                            error!(
                                "BigTable ingestion timed out after {} seconds. Trying again | \
                                table: {} | reason: {:?}",
                                seconds, task_table_name, err
                            );
                        }
                        // Do nothing, read-based errors.
                        Error::ObjectCorrupt(_) | Error::ObjectNotFound(_) | Error::RowNotFound => {
                        }
                    }

                    if &retry_count >= &task_retry_limit {
                        error!("Retry limit exceeded. Abandoning ship.");
                        break;
                    }

                    // Sleep for the backoff interval
                    let interval = obtain_next_interval(
                        &retry_count,
                        &task_retry_limit,
                        &task_retry_interval,
                    );

                    tokio::time::sleep(interval.unwrap()).await;
                } else {
                    // Push may not be successful, check if the GCP req is successful
                    if let Ok(mut gcp_result) = push_result {
                        // Since its successful, check if the Inner GCP msg is successful
                        match gcp_result.message().await {
                            Ok(msg) => {
                                // Ensure a message from GCP actually comes back
                                if let Some(gcp_res) = msg {
                                    // Iterate every entry im pushing
                                    // if one of the entries fail, we should add it to
                                    // a new collection to re-ingest again.
                                    let mut failed = false;
                                    let mut new_rows = Vec::new();
                                    let orig_size = gcp_res.entries.len();
                                    for entry in gcp_res.entries {
                                        match entry.status {
                                            Some(code) => {
                                                if code.code != 0 {
                                                    failed = true;
                                                    new_rows.push(
                                                        chunk_rows[entry.index as usize].clone(),
                                                    );
                                                }
                                            }
                                            None => {} // ???
                                        }
                                    }

                                    if failed {
                                        chunk_rows = new_rows.to_owned();

                                        warn!(
                                            "{}/{} entries failed to ingest. Trying again",
                                            chunk_rows.len(),
                                            orig_size
                                        );
                                    } else {
                                        match &task_table_prefix.is_empty() {
                                            true => {
                                                info!(
                                                    "{}/{} {} push complete!",
                                                    idx + 1,
                                                    &task_chunk_count,
                                                    &task_table_name
                                                );
                                            }
                                            _ => {
                                                info!(
                                                    "{}/{} {}/{} push complete!",
                                                    idx + 1,
                                                    &task_chunk_count,
                                                    &task_table_name,
                                                    &task_table_prefix
                                                );
                                            }
                                        }
                                        break;
                                    }
                                } else {
                                    error!("Empty message from GCP! Trying again...");
                                }
                            }
                            Err(tonic_status) => {
                                retry_count = process_bt_status(
                                    tonic_status,
                                    &task_retry_limit,
                                    task_chunk_count,
                                    task_table_prefix.as_str(),
                                );

                                if &retry_count >= &task_retry_limit {
                                    error!("Retry limit exceeded. Abandoning ship.");
                                    break;
                                }
                            }
                        }
                    } else {
                        error!("GCP Dispatch failed! Trying again...");
                    }
                }
            }
        }));
    }
    
    for task in bt_tasks {
        task.await.unwrap();
    }
}