1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
mod capped_hashset;
pub mod cli;
mod config;
mod exomind;
mod gmail;
mod parsing;
mod sync;

#[macro_use]
extern crate log;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate serde_derive;

use cli::{LoginOptions, LogoutOptions};
use config::Config;
use exocore::{
    core::futures::{sleep, tokio::time::Instant},
    protos::{prost::ProstAnyPackMessageExt, store::Trait},
    store::{mutation::MutationBuilder, store::Store},
};
use exomind::ExomindClient;
use gmail::{GmailAccount, GmailClient};
use std::{path::Path, time::Duration};
use sync::AccountSynchronizer;

pub async fn handle<C: AsRef<Path>>(
    client: exocore::client::Client,
    node_dir: C,
    opt: &cli::Options,
) {
    let conf_path = node_dir.as_ref().join(&opt.conf);
    let config = Config::from_file(conf_path).expect("Failed to parse config");
    let exm = ExomindClient::new(client)
        .await
        .expect("Couldn't create exomind client");

    match &opt.subcommand {
        cli::Command::Daemon => daemon(config, exm).await.unwrap(),
        cli::Command::ListAccounts => list_accounts(exm).await.unwrap(),
        cli::Command::Login(login_opt) => login(config, login_opt, exm).await.unwrap(),
        cli::Command::Logout(logout_opt) => logout(config, logout_opt, exm).await.unwrap(),
    };
}

async fn daemon(config: Config, exm: ExomindClient) -> anyhow::Result<()> {
    info!("Starting a gmail synchronizer");

    let accounts = exm.get_accounts(true).await?;

    let mut account_synchronizers = Vec::new();
    for account in accounts {
        let gmail_client = GmailClient::new(&config, account.clone()).await?;
        let mut synchronizer = AccountSynchronizer::new(account, exm.clone(), gmail_client);

        if config.save_fixtures {
            synchronizer.save_fixtures = true;
        }

        account_synchronizers.push(synchronizer);
    }

    let full_sync_interval: Duration = config.full_sync_interval.into();
    let mut last_full_sync: Option<Instant> = None;

    loop {
        let sync_res = synchronize_accounts(
            &mut last_full_sync,
            full_sync_interval,
            &mut account_synchronizers,
        )
        .await;
        if let Err(e) = sync_res {
            error!("Error executing synchronization loop: {}", e);
        }

        // TODO: Watch query on exomind
        sleep(Duration::from_secs(10)).await;
    }
}

async fn synchronize_accounts(
    last_full_sync: &mut Option<Instant>,
    full_sync_interval: Duration,
    account_synchronizers: &mut [AccountSynchronizer],
) -> Result<(), anyhow::Error> {
    let should_full_sync = last_full_sync.map_or(true, |i| i.elapsed() > full_sync_interval);
    if should_full_sync {
        for sync in account_synchronizers.iter_mut() {
            sync.synchronize_inbox().await.map_err(|err| {
                anyhow!(
                    "failed to fully sync inbox for account {:?}: {}",
                    sync.account,
                    err
                )
            })?;
        }
        *last_full_sync = Some(Instant::now());
    }

    for sync in account_synchronizers.iter_mut() {
        sync.maybe_refresh_client().await.map_err(|err| {
            anyhow!(
                "failed to refresh client for account {:?}: {}",
                sync.account,
                err
            )
        })?;
        sync.synchronize_history().await.map_err(|err| {
            anyhow!(
                "failed to sync history for account {:?}: {}",
                sync.account,
                err
            )
        })?;
    }

    Ok(())
}

async fn login(config: Config, opt: &LoginOptions, exm: ExomindClient) -> anyhow::Result<()> {
    let account = GmailAccount::from_email(&opt.email);
    let gmc = GmailClient::new(&config, account.clone()).await?;

    let profile = gmc.get_profile().await?;

    if profile.email_address.as_deref() != Some(&opt.email) {
        panic!(
            "Token is logged in to a different email. Expected {}, got {:?}",
            opt.email, profile.email_address
        );
    }

    let mutations = MutationBuilder::new().put_trait(
        format!("exomind_{}", opt.email),
        Trait {
            id: opt.email.clone(),
            message: Some(account.account.pack_to_any()?),
            ..Default::default()
        },
    );

    let _ = exm.store.mutate(mutations).await?;

    Ok(())
}

async fn logout(config: Config, opt: &LogoutOptions, exm: ExomindClient) -> anyhow::Result<()> {
    if let Ok(token_file) = gmail::account_token_file(&config, &opt.email) {
        let _ = std::fs::remove_file(token_file);
    }

    let mutations = MutationBuilder::new().delete_entity(format!("exomind_{}", opt.email));
    let _ = exm.store.mutate(mutations).await?;

    Ok(())
}

async fn list_accounts(exm: ExomindClient) -> anyhow::Result<()> {
    let accounts = exm.get_accounts(true).await?;

    for account in accounts {
        println!("{:?}", account);
    }

    Ok(())
}