reinfer-cli 0.38.5

Command line interface for Re:infer, the conversational data intelligence platform
use crate::{
    commands::DEFAULT_TRANSFORM_TAG,
    parse::{get_files_in_directory, Statistics},
};
use anyhow::{anyhow, Context, Result};
use cfb::CompoundFile;
use colored::Colorize;
use log::error;
use once_cell::sync::Lazy;
use regex::Regex;
use std::{io::Read, sync::Arc};

use reinfer_client::{
    resources::{
        attachments::AttachmentMetadata,
        documents::{Document, RawEmail, RawEmailBody, RawEmailHeaders},
    },
    Client, PropertyMap, SourceIdentifier, TransformTag,
};
use std::{
    fs::File,
    path::{Path, PathBuf},
};
use structopt::StructOpt;

use crate::{
    commands::ensure_uip_user_consents_to_ai_unit_charge,
    progress::{Options as ProgressOptions, Progress},
};

use super::upload_batch_of_documents;

const MSG_NAME_USER_PROPERTY_NAME: &str = "MSG NAME ID";
const STREAM_PATH_ATTACHMENT_STORE_PREFIX: &str = "__attach_version1.0_#";
const UPLOAD_BATCH_SIZE: usize = 128;

static CONTENT_TYPE_MIME_HEADER_RX: Lazy<Regex> =
    Lazy::new(|| Regex::new(r"Content-Type:((\s)+.+\n)+").unwrap());
static CONTENT_TRANSFER_ENCODING_MIME_HEADER_RX: Lazy<Regex> =
    Lazy::new(|| Regex::new(r"Content-Transfer-Encoding:((\s)+.+\n)+").unwrap());
static STREAM_PATH_MESSAGE_BODY_PLAIN: Lazy<PathBuf> =
    Lazy::new(|| PathBuf::from("__substg1.0_1000001F"));
static STREAM_PATH_MESSAGE_HEADER: Lazy<PathBuf> =
    Lazy::new(|| PathBuf::from("__substg1.0_007d001F"));
static STREAM_PATH_ATTACHMENT_FILENAME: Lazy<PathBuf> =
    Lazy::new(|| PathBuf::from("__substg1.0_3707001F"));
static STREAM_PATH_ATTACHMENT_EXTENSION: Lazy<PathBuf> =
    Lazy::new(|| PathBuf::from("__substg1.0_3703001F"));
static STREAM_PATH_ATTACHMENT_DATA: Lazy<PathBuf> =
    Lazy::new(|| PathBuf::from("__substg1.0_37010102"));

#[derive(Debug, StructOpt)]
pub struct ParseMsgArgs {
    #[structopt(short = "d", long = "dir", parse(from_os_str))]
    /// Directory containing the msgs
    directory: PathBuf,

    #[structopt(short = "s", long = "source")]
    /// Source name or id
    source: SourceIdentifier,

    #[structopt(long = "transform-tag")]
    /// Transform tag to use.
    transform_tag: Option<TransformTag>,

    #[structopt(short = "n", long = "no-charge")]
    /// Whether to attempt to bypass billing (internal only)
    no_charge: bool,

    #[structopt(short = "y", long = "yes")]
    /// Consent to ai unit charge. Suppresses confirmation prompt.
    yes: bool,
}

fn read_stream(stream_path: &Path, compound_file: &mut CompoundFile<File>) -> Result<Vec<u8>> {
    let data = {
        let mut stream = compound_file.open_stream(stream_path)?;
        let mut buffer = Vec::new();
        stream.read_to_end(&mut buffer)?;
        buffer
    };

    Ok(data)
}

fn read_unicode_stream_to_string(
    stream_path: &Path,
    compound_file: &mut CompoundFile<File>,
) -> Result<String> {
    if !compound_file.is_stream(stream_path) {
        return Err(anyhow!(
            "Could not find stream {}. Please check that you are using unicode msgs",
            stream_path.to_string_lossy()
        ));
    }

    // Stream data is a UTF16 string encoded as Vec[u8]
    let data = read_stream(stream_path, compound_file)?;
    Ok(utf16le_stream_to_string(&data))
}

// Decode a UTF-16LE data stream, given as raw bytes of Vec<u8>
fn utf16le_stream_to_string(data: &[u8]) -> String {
    let mut decoder = encoding_rs::UTF_16LE.new_decoder();

    // The amount of memory to reserve for writing at a time
    // We should only require one or two blocks for the vast majority of cases
    let block_length = data.len();
    let mut buffer: String = String::with_capacity(block_length);

    loop {
        let (coder_result, _, _) = decoder.decode_to_string(data, &mut buffer, true);
        use encoding_rs::CoderResult;
        match coder_result {
            // The output buffer was not big enough - increase and retry
            CoderResult::OutputFull => buffer.reserve(block_length),
            CoderResult::InputEmpty => return buffer,
        }
    }
}

fn get_attachment_store_path(attachment_number: usize) -> PathBuf {
    PathBuf::from(format!(
        "{STREAM_PATH_ATTACHMENT_STORE_PREFIX}{attachment_number:08}"
    ))
}

fn read_attachment(
    attachment_path: PathBuf,
    compound_file: &mut CompoundFile<File>,
) -> Result<AttachmentMetadata> {
    let mut attachment_name_path = attachment_path.clone();
    attachment_name_path.push(&*STREAM_PATH_ATTACHMENT_FILENAME);

    let mut content_type_path = attachment_path.clone();
    content_type_path.push(&*STREAM_PATH_ATTACHMENT_EXTENSION);

    let mut data_path = attachment_path.clone();
    data_path.push(&*STREAM_PATH_ATTACHMENT_DATA);

    let name = read_unicode_stream_to_string(&attachment_name_path, compound_file)?;
    let content_type = read_unicode_stream_to_string(&content_type_path, compound_file)?;
    let data = read_stream(&data_path, compound_file)?;

    Ok(AttachmentMetadata {
        name,
        content_type,
        size: data.len() as u64,
        attachment_reference: None,
        content_hash: None,
    })
}

fn remove_content_headers(headers_string: String) -> Result<String> {
    let mut clean_headers_string: String;

    clean_headers_string = CONTENT_TYPE_MIME_HEADER_RX
        .replace(&headers_string, "")
        .to_string();

    clean_headers_string = CONTENT_TRANSFER_ENCODING_MIME_HEADER_RX
        .replace(&clean_headers_string, "")
        .to_string();

    Ok(clean_headers_string)
}

fn read_msg_to_document(path: &PathBuf) -> Result<Document> {
    if !path.is_file() {
        return Err(anyhow!("No such file: {:?}", path));
    }

    let mut compound_file = cfb::open(path)?;

    // Headers
    let headers_string =
        read_unicode_stream_to_string(&STREAM_PATH_MESSAGE_HEADER, &mut compound_file)?;

    // As the content type won't match the parsed value from the body in the msg
    let headers_string_no_content_headers = remove_content_headers(headers_string)?;

    let plain_body_string =
        read_unicode_stream_to_string(&STREAM_PATH_MESSAGE_BODY_PLAIN, &mut compound_file)?;

    // Attachments
    let mut attachment_number = 0;
    let mut attachments = Vec::new();
    loop {
        let attachment_path = get_attachment_store_path(attachment_number);

        if compound_file.is_storage(&attachment_path) {
            attachments.push(read_attachment(attachment_path, &mut compound_file)?);
        } else {
            break;
        }

        attachment_number += 1;
    }

    // User Properties
    let mut user_properties = PropertyMap::new();
    user_properties.insert_string(
        MSG_NAME_USER_PROPERTY_NAME.to_string(),
        path.file_name()
            .context("Could not get file name")?
            .to_string_lossy()
            .to_string(),
    );

    Ok(Document {
        raw_email: RawEmail {
            body: RawEmailBody::Plain(plain_body_string),
            headers: RawEmailHeaders::Raw(headers_string_no_content_headers),
            attachments,
        },
        user_properties,
        comment_id: None,
    })
}

pub fn parse(client: &Client, args: &ParseMsgArgs) -> Result<()> {
    let ParseMsgArgs {
        directory,
        source,
        transform_tag,
        no_charge,
        yes,
    } = args;

    if !no_charge && !yes {
        ensure_uip_user_consents_to_ai_unit_charge(client.base_url())?;
    }

    let msg_paths = get_files_in_directory(directory, "msg", true)?;
    let statistics = Arc::new(Statistics::new());
    let _progress = get_progress_bar(msg_paths.len() as u64, &statistics);
    let source = client.get_source(source.clone())?;
    let transform_tag = transform_tag
        .clone()
        .unwrap_or(DEFAULT_TRANSFORM_TAG.clone());

    let mut documents = Vec::new();
    let mut errors = Vec::new();

    let send = |documents: &mut Vec<Document>| -> Result<()> {
        upload_batch_of_documents(
            client,
            &source,
            documents,
            &transform_tag,
            *no_charge,
            &statistics,
        )?;
        documents.clear();
        Ok(())
    };

    for path in msg_paths {
        match read_msg_to_document(&path.path()) {
            Ok(document) => {
                documents.push(document);

                if documents.len() >= UPLOAD_BATCH_SIZE {
                    send(&mut documents)?;
                }
                statistics.increment_processed();
            }
            Err(error) => {
                errors.push(format!(
                    "Failed to process file {}: {}",
                    path.file_name().to_string_lossy(),
                    error
                ));
                statistics.increment_failed();
                statistics.increment_processed();
            }
        }
    }

    send(&mut documents)?;

    for error in errors {
        error!("{error}");
    }

    Ok(())
}

fn get_progress_bar(total_bytes: u64, statistics: &Arc<Statistics>) -> Progress {
    Progress::new(
        move |statistic| {
            let num_processed = statistic.num_processed();
            let num_failed = statistic.num_failed();
            let num_uploaded = statistic.num_uploaded();
            (
                num_processed as u64,
                format!(
                    "{} {} {} {} {} {}",
                    num_processed.to_string().bold(),
                    "processed".dimmed(),
                    num_failed.to_string().bold(),
                    "failed".dimmed(),
                    num_uploaded.to_string().bold(),
                    "uploaded".dimmed()
                ),
            )
        },
        statistics,
        Some(total_bytes),
        ProgressOptions { bytes_units: false },
    )
}

#[cfg(test)]
mod tests {
    use super::*;
    use pretty_assertions::assert_eq;

    #[test]
    fn test_read_msg_to_document_non_unicode() {
        let result = read_msg_to_document(&PathBuf::from("tests/samples/non-unicode.msg"));

        assert_eq!(result.expect_err("Expected Error Result").to_string(), "Could not find stream __substg1.0_007d001F. Please check that you are using unicode msgs");
    }

    #[test]
    fn test_read_msg_to_document_unicode() {
        let mut expected_user_properties = PropertyMap::new();
        expected_user_properties
            .insert_string("MSG NAME ID".to_string(), "unicode.msg".to_string());

        let expected_headers = "Received: from DB8PR02MB5883.eurprd02.prod.outlook.com (2603:10a6:10:116::17)\r\n by AM6PR02MB4215.eurprd02.prod.outlook.com with HTTPS; Wed, 25 Oct 2023\r\n 17:03:35 +0000\r\nAuthentication-Results: dkim=none (message not signed)\r\n header.d=none;dmarc=none action=none header.from=uipath.com;\r\nReceived: from AM9PR02MB6642.eurprd02.prod.outlook.com (2603:10a6:20b:2d2::18)\r\n by DB8PR02MB5883.eurprd02.prod.outlook.com (2603:10a6:10:116::17) with\r\n Microsoft SMTP Server (version=TLS1_2,\r\n cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.6907.33; Wed, 25 Oct\r\n 2023 17:03:23 +0000\r\nReceived: from AM9PR02MB6642.eurprd02.prod.outlook.com\r\n ([fe80::fb6e:4d16:d9ba:33ae]) by AM9PR02MB6642.eurprd02.prod.outlook.com\r\n ([fe80::fb6e:4d16:d9ba:33ae%6]) with mapi id 15.20.6933.019; Wed, 25 Oct 2023\r\n 17:03:23 +0000\r\nFrom: Joe Prosser <joe.prosser@uipath.com>\r\nTo: Andra Buica <andra.buica@uipath.com>\r\nSubject: Re: Testing the CLI!!\r\nThread-Topic: Testing the CLI!!\r\nThread-Index: AQHaB2N76HBc3H6u4Eeu9hAZAXjmE7BauHazgAAAIR2AAAJRsw==\r\nDate: Wed, 25 Oct 2023 17:03:22 +0000\r\nMessage-ID:\r\n <AM9PR02MB66424EB36E9581626499575190DEA@AM9PR02MB6642.eurprd02.prod.outlook.com>\r\nReferences:\r\n <AM9PR02MB664276669FA004F9D7AE85F890DEA@AM9PR02MB6642.eurprd02.prod.outlook.com>\r\n <AM6PR02MB4215248605797A7CCC598DA28EDEA@AM6PR02MB4215.eurprd02.prod.outlook.com>\r\n <AM9PR02MB664263ADFB43C2AA252F510F90DEA@AM9PR02MB6642.eurprd02.prod.outlook.com>\r\nIn-Reply-To:\r\n <AM9PR02MB664263ADFB43C2AA252F510F90DEA@AM9PR02MB6642.eurprd02.prod.outlook.com>\r\nAccept-Language: en-GB, en-US\r\nContent-Language: en-GB\r\nX-MS-Has-Attach: yes\r\nX-MS-Exchange-Organization-SCL: 1\r\nX-MS-TNEF-Correlator:\r\n <AM9PR02MB66424EB36E9581626499575190DEA@AM9PR02MB6642.eurprd02.prod.outlook.com>\r\nmsip_labels:\r\nMIME-Version: 1.0\r\nX-MS-Exchange-Organization-MessageDirectionality: Originating\r\nX-MS-Exchange-Organization-AuthSource: AM9PR02MB6642.eurprd02.prod.outlook.com\r\nX-MS-Exchange-Organization-AuthAs: Internal\r\nX-MS-Exchange-Organization-AuthMechanism: 04\r\nX-MS-Exchange-Organization-Network-Message-Id:\r\n 178907c2-2871-4221-a75a-08dbd57c4bf2\r\nX-MS-PublicTrafficType: Email\r\nX-MS-TrafficTypeDiagnostic:\r\n AM9PR02MB6642:EE_|DB8PR02MB5883:EE_|AM6PR02MB4215:EE_\r\nReturn-Path: joe.prosser@uipath.com\r\nX-MS-Exchange-Organization-ExpirationStartTime: 25 Oct 2023 17:03:23.2098\r\n (UTC)\r\nX-MS-Exchange-Organization-ExpirationStartTimeReason: OriginalSubmit\r\nX-MS-Exchange-Organization-ExpirationInterval: 1:00:00:00.0000000\r\nX-MS-Exchange-Organization-ExpirationIntervalReason: OriginalSubmit\r\nX-MS-Office365-Filtering-Correlation-Id: 178907c2-2871-4221-a75a-08dbd57c4bf2\r\nX-MS-Exchange-AtpMessageProperties: SA|SL\r\nX-Microsoft-Antispam: BCL:0;\r\nX-Forefront-Antispam-Report:\r\n CIP:255.255.255.255;CTRY:;LANG:en;SCL:1;SRV:;IPV:NLI;SFV:NSPM;H:AM9PR02MB6642.eurprd02.prod.outlook.com;PTR:;CAT:NONE;SFS:;DIR:INT;\r\nX-MS-Exchange-CrossTenant-OriginalArrivalTime: 25 Oct 2023 17:03:22.9433\r\n (UTC)\r\nX-MS-Exchange-CrossTenant-FromEntityHeader: Hosted\r\nX-MS-Exchange-CrossTenant-Id: d8353d2a-b153-4d17-8827-902c51f72357\r\nX-MS-Exchange-CrossTenant-AuthSource: AM9PR02MB6642.eurprd02.prod.outlook.com\r\nX-MS-Exchange-CrossTenant-AuthAs: Internal\r\nX-MS-Exchange-CrossTenant-Network-Message-Id: 178907c2-2871-4221-a75a-08dbd57c4bf2\r\nX-MS-Exchange-CrossTenant-MailboxType: HOSTED\r\nX-MS-Exchange-CrossTenant-UserPrincipalName: ++FDvUGfWQ/f3Ycjz3vJlna939o3V0zI8K8BPWCQ7aaQDhAJYe5XMf5Tr6B6e/5ucWri5/sUCnvrb1GDymXZmA==\r\nX-MS-Exchange-Transport-CrossTenantHeadersStamped: DB8PR02MB5883\r\nX-MS-Exchange-Transport-EndToEndLatency: 00:00:12.3444456\r\nX-MS-Exchange-Processed-By-BccFoldering: 15.20.6907.032\r\nX-Microsoft-Antispam-Mailbox-Delivery:\r\n\tucf:0;jmr:0;auth:0;dest:I;ENG:(910001)(944506478)(944626604)(920097)(425001)(930097)(140003)(1420103);\r\nX-Microsoft-Antispam-Message-Info:\r\n\thxMOcmClDYIwXiLHDHfW3mj4bIuPkLWLYD9z839jTLrsdaQbyDGWXui95ou9iKuUSUHzubxtiSwgj9OVTVInYaJ6SuN1imGY/n99Cn1vAx4VAh1xSYBUsm2bHu5atiwnMZTQaWg7BA03Yj6BSrOYqh49oLJrh9blZmtlkMs4uCi1Y5Y7YpEMpzLi7ye2fg+gPqELrNLHIKfziazCdrPNLKQ+9tlbb6UHbxeX1YVGY2ebnYXPwRilEP++cljm3hV8abvlgC1GF+nsuIS5XJwhkHmgTyetr7iZE3GWR9XC0NsB9w5bQgg82r75ozlGKWVKkuev1IlRjStpKaJbOoPBvo/6SCqSbWaQinyUEfPDXiJOYHW3D0xsf4uCpbFpb1D+TpQr+dFaCxBNdmFjBrd/SCHoyyl9QAO1nz0W5cUAwjSDKN7Pv7iKIUlx24nkDCeYeVqKhNcqulwIlEP6ewBBL2BTSplNPApIbliiHhh/Z6mes1xx3dfB5T4tIUv6wINSH3G2Ddec2fLzeTHknuyaOA9Wj8ks+JIjE+i5/CPidxfH4ACoggwbdnLwwnwniGcNoZKdyM+G4whogPe2oKXPggX9er/44bUOlKEcK5DsPFEZX2xKzsg7JwPPLcgO/lbP2iN/yFww6I6vri27P29a86np21iNSGOb51giFj5wgSq4iZLeRe64cEj4+i4K1KosNLBgDYTj/WGrioHl5Xe9ww==\r\n";

        let expected_body = "Hey, \r\n\r\nWe should check that attachments work to ✅\r\n\r\nJoe \r\n________________________________\r\n\r\nFrom: Joe Prosser <joe.prosser@uipath.com>\r\nSent: 25 October 2023 17:53\r\nTo: Andra Buica <andra.buica@uipath.com>\r\nSubject: Re: Testing the CLI!! \r\n \r\nHey,\r\n\r\nFingers cross  🤞 \r\n\r\nJoe\r\n________________________________\r\n\r\nFrom: Andra Buica <andra.buica@uipath.com>\r\nSent: 25 October 2023 17:52\r\nTo: Joe Prosser <joe.prosser@uipath.com>\r\nSubject: Re: Testing the CLI!! \r\n \r\n\r\nHey, \r\n\r\n \r\n\r\nHopefully it will work! \r\n\r\n \r\n\r\nAndra\r\n\r\n \r\n\r\nFrom: Joe Prosser <joe.prosser@uipath.com>\r\nDate: Wednesday, 25 October 2023 at 17:52\r\nTo: Andra Buica <andra.buica@uipath.com>\r\nSubject: Testing the CLI!!\r\n\r\nHey,\r\n\r\n \r\n\r\nDo you think it will work?\r\n\r\n \r\n\r\nJoe \r\n\r\n";

        let expected_attachments = vec![
            AttachmentMetadata {
                name: "hello.txt".to_string(),
                size: 12,
                content_type: ".txt".to_string(),
                attachment_reference: None,
                content_hash: None,
            },
            AttachmentMetadata {
                name: "world.pdf".to_string(),
                size: 7302,
                content_type: ".pdf".to_string(),
                attachment_reference: None,
                content_hash: None,
            },
        ];

        let expected_document = Document {
            comment_id: None,
            raw_email: RawEmail {
                body: RawEmailBody::Plain(expected_body.to_string()),
                headers: RawEmailHeaders::Raw(expected_headers.to_string()),
                attachments: expected_attachments,
            },
            user_properties: expected_user_properties,
        };

        let actual_document = read_msg_to_document(&PathBuf::from("tests/samples/unicode.msg"))
            .expect("Failed to read msg");

        assert_eq!(expected_document, actual_document);
    }
}