use std::time::Instant;
use crate::api::public::execute_schema;
use crate::{
agent::{
AgentOptions,
handlers::{self, spawn_handle_db_maintenance},
metrics, setup, util,
},
broadcast::runtime_loop,
transport::Transport,
};
use klukai_types::{
actor::ActorId,
agent::{Agent, BookedVersions, Bookie},
base::CrsqlSeq,
channel::bounded,
config::{Config, PerfConfig},
};
use futures::{FutureExt, StreamExt, TryStreamExt};
use klukai_types::{spawn::spawn_counted, tripwire::Tripwire};
use tokio::task::JoinHandle;
use tracing::{error, info};
pub async fn start_with_config(
conf: Config,
tripwire: Tripwire,
) -> eyre::Result<(Agent, Bookie, Transport, Vec<JoinHandle<()>>)> {
let (agent, opts) = setup(conf.clone(), tripwire.clone()).await?;
let transport = opts.transport.clone();
let (bookie, handles) = run(agent.clone(), opts, conf.perf).await?;
Ok((agent, bookie, transport, handles))
}
async fn run(
agent: Agent,
opts: AgentOptions,
pconf: PerfConfig,
) -> eyre::Result<(Bookie, Vec<JoinHandle<()>>)> {
let AgentOptions {
gossip_server_endpoint,
transport,
api_listeners,
tripwire,
lock_registry,
rx_bcast,
rx_apply,
rx_clear_buf,
rx_changes,
rx_foca,
subs_manager,
subs_bcast_cache,
updates_bcast_cache,
rtt_rx,
} = opts;
let gossip_addr = gossip_server_endpoint.local_addr()?;
let (to_send_tx, to_send_rx) = bounded(pconf.to_send_channel_len, "to_send");
let (notifications_tx, notifications_rx) =
bounded(pconf.notifications_channel_len, "notifications");
runtime_loop(
agent.actor(None),
agent.clone(),
transport.clone(),
rx_foca,
rx_bcast,
to_send_tx,
notifications_tx,
tripwire.clone(),
);
handlers::spawn_rtt_handler(&agent, rtt_rx);
handlers::spawn_swim_announcer(&agent, gossip_addr, tripwire.clone());
util::initialise_foca(&agent).await;
let stmts = util::read_files_from_paths(&agent.config().db.schema_paths).await?;
if !stmts.is_empty()
&& let Err(e) = execute_schema(&agent, stmts).await
{
error!("could not execute schema: {e}");
}
let mut handles = vec![];
let mut http_handles = util::setup_http_api_handler(
&agent,
&tripwire,
subs_bcast_cache,
updates_bcast_cache,
&subs_manager,
api_listeners,
)
.await?;
handles.append(&mut http_handles);
tokio::spawn(util::clear_buffered_meta_loop(agent.clone(), rx_clear_buf));
tokio::spawn(metrics::metrics_loop(agent.clone(), transport.clone()));
tokio::spawn(handlers::handle_gossip_to_send(
transport.clone(),
to_send_rx,
));
tokio::spawn(handlers::handle_notifications(
agent.clone(),
notifications_rx,
));
spawn_handle_db_maintenance(&agent);
let bookie = Bookie::new_with_registry(Default::default(), lock_registry);
{
let mut w = bookie.write::<&str, _>("init", None).await;
w.insert(agent.actor_id(), agent.booked().clone());
}
let start = Instant::now();
{
let conn = agent.pool().read().await?;
let actor_ids: Vec<ActorId> = conn
.prepare(
"SELECT site_id FROM crsql_site_id WHERE ordinal > 0
UNION
SELECT distinct site_id FROM __corro_seq_bookkeeping",
)?
.query_map([], |row| row.get(0))
.and_then(|rows| rows.collect::<rusqlite::Result<Vec<_>>>())?;
drop(conn);
let pool = agent.pool();
let mut buf = futures::stream::iter(
actor_ids
.into_iter()
.filter(|other_actor_id| *other_actor_id != agent.actor_id())
.map(|actor_id| {
let pool = pool.clone();
async move {
tokio::spawn(async move {
let conn = pool.read().await?;
tokio::task::block_in_place(|| {
BookedVersions::from_conn(&conn, actor_id)
.map(|bv| (actor_id, bv))
.map_err(eyre::Report::from)
})
})
.await?
}
}),
)
.buffer_unordered(4);
while let Some((actor_id, bv)) = TryStreamExt::try_next(&mut buf).await? {
for (version, partial) in bv.partials.iter() {
let gaps_count = partial.seqs.gaps(&(CrsqlSeq(0)..=partial.last_seq)).count();
if gaps_count == 0 {
info!(%actor_id, %version, "found fully buffered, unapplied, changes! scheduling apply");
let tx_apply = agent.tx_apply().clone();
let version = *version;
tokio::spawn(async move {
if let Err(e) = tx_apply.send((actor_id, version)).await {
error!("could not schedule buffered changes application: {e}");
}
});
}
}
bookie
.write::<&str, _>("replace_actor", None)
.await
.replace_actor(actor_id, bv);
}
}
info!("Bookkeeping fully loaded in {:?}", start.elapsed());
spawn_counted(
util::sync_loop(
agent.clone(),
bookie.clone(),
transport.clone(),
tripwire.clone(),
)
.inspect(|_| info!("corrosion agent sync loop is done")),
);
spawn_counted(
util::apply_fully_buffered_changes_loop(
agent.clone(),
bookie.clone(),
rx_apply,
tripwire.clone(),
)
.inspect(|_| info!("corrosion buffered changes loop is done")),
);
info!("Starting peer API on udp/{gossip_addr} (QUIC)");
handlers::spawn_gossipserver_handler(&agent, &bookie, &tripwire, gossip_server_endpoint);
let changes_handle = spawn_counted(
handlers::handle_changes(agent.clone(), bookie.clone(), rx_changes, tripwire.clone())
.inspect(|_| info!("corrosion handle changes loop is done")),
);
handles.push(changes_handle);
Ok((bookie, handles))
}