use crate::share::{SharedState, Trans};
use rustdb::{Database, Part};
use std::sync::Arc;
use tokio::sync::mpsc;
pub async fn u_decay_loop(ss: Arc<SharedState>) {
loop {
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
ss.u_decay();
}
}
pub async fn backup_loop(is_new: bool, state: Arc<SharedState>) {
if is_new {
let sql = rget(state.clone(), "/log-getall").await;
let sql = std::str::from_utf8(&sql).unwrap().to_string();
let mut st = Trans::new();
st.log = false;
st.x.qy.sql = Arc::new(sql);
st = state.process(st).await;
println!("New replicated database initialised error={}", &st.x.rp.err);
let mut st = Trans::new();
st.log = false;
st.x.qy.sql = Arc::new("EXEC log.InitReplication()".to_string());
let _st = state.process(st).await;
}
let mut fetch = {
let mut st = Trans::new();
st.log = false;
st.x.qy.sql = Arc::new("EXEC log.GetFetch()".to_string());
st = state.process(st).await;
let s = std::str::from_utf8(&st.x.rp.output).unwrap();
s.parse::<u64>().unwrap()
};
println!("Backup from fetch={}", fetch);
loop {
let url = format!("/log-get?k={fetch}");
let ser = rget(state.clone(), &url).await;
if !ser.is_empty() {
let mut st = Trans::new();
let mut part = Part::default();
part.data = Arc::new(ser);
st.x.qy.parts.push(part);
st.x.qy.sql = Arc::new("EXEC log.Save()".to_string());
state.process(st).await;
println!("Saved Transaction Id={fetch}");
fetch += 1;
state.new_trans();
}
}
}
async fn sleep_real(secs: u64) {
let start = std::time::SystemTime::now();
for _ in (0..secs).step_by(10) {
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
match start.elapsed() {
Ok(e) => {
if e >= core::time::Duration::from_secs(secs) {
return;
}
}
Err(_) => {
return;
}
}
}
}
async fn rget(state: Arc<SharedState>, query: &str) -> Vec<u8> {
let client = reqwest::Client::builder()
.default_headers(reqwest::header::HeaderMap::new())
.build()
.unwrap();
loop {
let mut retry_delay = true;
let req = client
.get(state.replicate_source.clone() + query)
.header("Cookie", state.replicate_credentials.clone());
tokio::select! {
response = req.send() =>
{
match response
{
Ok(r) => {
let status = r.status();
if status.is_success()
{
match r.bytes().await {
Ok(b) => { return b.to_vec(); }
Err(e) => { println!("rget failed to get bytes err={e}" ); }
}
} else {
println!("rget bad response status = {status}");
}
}
Err(e) => {
println!("rget send error {e}");
}
}
}
_ = sleep_real(800) =>
{
println!( "rget timed out after 800 seconds" );
retry_delay = false;
}
}
if retry_delay {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}
}
pub async fn sleep_loop(mut rx: mpsc::UnboundedReceiver<u64>, state: Arc<SharedState>) {
let mut sleep_micro = 5000000;
loop {
tokio::select! {
ns = rx.recv() => { sleep_micro = ns.unwrap(); }
_ = tokio::time::sleep(core::time::Duration::from_micros(sleep_micro)) =>
{
if state.is_master
{
let mut st = Trans::new();
st.x.qy.sql = Arc::new("EXEC timed.Run()".to_string());
state.process(st).await;
}
}
}
}
}
pub async fn email_loop(mut rx: mpsc::UnboundedReceiver<()>, state: Arc<SharedState>) {
loop {
let mut send_list = Vec::new();
{
let _ = rx.recv().await;
let apd = state.spd.new_reader();
let db = Database::new(apd, "", state.bmap.clone());
let qt = db.table("email", "Queue");
let mt = db.table("email", "Msg");
let at = db.table("email", "SmtpAccount");
for (pp, off) in qt.scan(&db) {
let p = &pp.borrow();
let a = qt.access(p, off);
let msg = a.int(0) as u64;
if let Some((pp, off)) = mt.id_get(&db, msg) {
let p = &pp.borrow();
let a = mt.access(p, off);
let from = a.str(&db, 0);
let to = a.str(&db, 1);
let title = a.str(&db, 2);
let body = a.str(&db, 3);
let format = a.int(4);
let account = a.int(5) as u64;
if let Some((pp, off)) = at.id_get(&db, account) {
let p = &pp.borrow();
let a = at.access(p, off);
let server = a.str(&db, 0);
let username = a.str(&db, 1);
let password = a.str(&db, 2);
send_list.push((
msg,
(from, to, title, body, format),
(server, username, password),
));
}
}
}
}
for (msg, email, account) in send_list {
let blocking_task = tokio::task::spawn_blocking(move || send_email(email, account));
let result = blocking_task.await.unwrap();
match result {
Ok(_) => email_sent(&state, msg).await,
Err(e) => match e {
EmailError::Address(ae) => {
email_error(&state, msg, 0, ae.to_string()).await;
}
EmailError::Lettre(le) => {
email_error(&state, msg, 0, le.to_string()).await;
}
EmailError::Send(se) => {
let retry = if se.is_transient() { 1 } else { 0 };
email_error(&state, msg, retry, se.to_string()).await;
}
},
}
}
}
}
#[derive(Debug)]
enum EmailError {
Address(lettre::address::AddressError),
Lettre(lettre::error::Error),
Send(lettre::transport::smtp::Error),
}
impl From<lettre::address::AddressError> for EmailError {
fn from(e: lettre::address::AddressError) -> Self {
EmailError::Address(e)
}
}
impl From<lettre::error::Error> for EmailError {
fn from(e: lettre::error::Error) -> Self {
EmailError::Lettre(e)
}
}
impl From<lettre::transport::smtp::Error> for EmailError {
fn from(e: lettre::transport::smtp::Error) -> Self {
EmailError::Send(e)
}
}
fn send_email(
(from, to, title, body, format): (String, String, String, String, i64),
(server, username, password): (String, String, String),
) -> Result<(), EmailError> {
use lettre::{
Message, SmtpTransport, Transport,
message::SinglePart,
transport::smtp::{
PoolConfig,
authentication::{Credentials, Mechanism},
},
};
let body = match format {
1 => SinglePart::html(body),
_ => SinglePart::plain(body),
};
let email = Message::builder()
.to(to.parse()?)
.from(from.parse()?)
.subject(title)
.singlepart(body)?;
let sender = SmtpTransport::starttls_relay(&server)?
.credentials(Credentials::new(username, password))
.authentication(vec![Mechanism::Plain])
.pool_config(PoolConfig::new().max_size(20))
.build();
let _result = sender.send(&email)?;
Ok(())
}
async fn email_sent(state: &SharedState, msg: u64) {
let mut st = Trans::new();
st.x.qy.sql = Arc::new(format!("EXEC email.Sent({})", msg));
state.process(st).await;
}
async fn email_error(state: &SharedState, msg: u64, retry: i8, err: String) {
let mut st = Trans::new();
let src = format!("EXEC email.LogSendError({},{},'{}')", msg, retry, err);
st.x.qy.sql = Arc::new(src);
state.process(st).await;
}