use std::{
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
pub(crate) struct AtomicTimestamp(AtomicU64);
impl AtomicTimestamp {
pub(crate) fn now() -> Self {
Self(AtomicU64::new(Self::current_micros()))
}
pub(crate) fn store_now(&self) {
self.0.store(Self::current_micros(), Ordering::Relaxed);
}
pub(crate) fn elapsed(&self) -> Duration {
let last = self.0.load(Ordering::Relaxed);
let now = Self::current_micros();
Duration::from_micros(now.saturating_sub(last))
}
fn current_micros() -> u64 {
linera_base::time::SystemTime::now()
.duration_since(linera_base::time::UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}
}
use linera_base::{
data_types::{BlockHeight, Timestamp},
identifiers::ChainId,
time::Duration,
};
use linera_execution::{QueryContext, ServiceRuntimeEndpoint, ServiceSyncRuntime};
use linera_storage::Storage;
use tokio::sync::{OwnedRwLockReadGuard, RwLock};
use super::{config::ChainWorkerConfig, state::ChainWorkerState};
pub(crate) struct RollbackGuard<S: Storage + Clone + 'static>(
tokio::sync::OwnedRwLockWriteGuard<ChainWorkerState<S>>,
);
impl<S: Storage + Clone + 'static> Deref for RollbackGuard<S> {
type Target = ChainWorkerState<S>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<S: Storage + Clone + 'static> DerefMut for RollbackGuard<S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<S: Storage + Clone + 'static> Drop for RollbackGuard<S> {
fn drop(&mut self) {
self.0.rollback();
}
}
pub(crate) struct ServiceRuntimeActor {
pub(crate) task: web_thread_pool::Task<()>,
pub(crate) endpoint: ServiceRuntimeEndpoint,
}
impl ServiceRuntimeActor {
pub(crate) async fn spawn(
chain_id: ChainId,
thread_pool: &linera_execution::ThreadPool,
) -> Self {
let (execution_state_sender, incoming_execution_requests) =
futures::channel::mpsc::unbounded();
let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel();
Self {
endpoint: ServiceRuntimeEndpoint {
incoming_execution_requests,
runtime_request_sender,
},
task: thread_pool
.run((), move |()| async move {
ServiceSyncRuntime::new(
execution_state_sender,
QueryContext {
chain_id,
next_block_height: BlockHeight(0),
local_time: Timestamp::from(0),
},
)
.run(runtime_request_receiver)
})
.await,
}
}
}
pub(crate) fn create_chain_worker<S: Storage + Clone + 'static>(
state: ChainWorkerState<S>,
is_tracked: bool,
config: &ChainWorkerConfig,
) -> Arc<RwLock<ChainWorkerState<S>>> {
let last_access = state.last_access_arc();
let chain_id = state.chain().chain_id();
let arc = Arc::new(RwLock::new(state));
let ttl = if is_tracked {
config.ttl
} else {
config.sender_chain_ttl
};
if let Some(ttl) = ttl {
spawn_keep_alive(chain_id, Arc::clone(&arc), last_access, ttl);
}
arc
}
pub(crate) async fn read_lock<S: Storage + Clone + 'static>(
state: &Arc<RwLock<ChainWorkerState<S>>>,
) -> OwnedRwLockReadGuard<ChainWorkerState<S>> {
let guard = state.clone().read_owned().await;
guard.touch();
guard
}
pub(crate) async fn read_lock_initialized<S: Storage + Clone + 'static>(
state: &Arc<RwLock<ChainWorkerState<S>>>,
) -> Result<OwnedRwLockReadGuard<ChainWorkerState<S>>, crate::worker::WorkerError> {
{
let guard = read_lock(state).await;
if guard.knows_chain_is_active() {
return Ok(guard);
}
}
let state_for_task = state.clone();
linera_base::task::run_detached(async move {
let mut guard = write_lock(&state_for_task).await;
guard.initialize_and_save_if_needed().await
})
.await?;
Ok(read_lock(state).await)
}
pub(crate) async fn write_lock<S: Storage + Clone + 'static>(
state: &Arc<RwLock<ChainWorkerState<S>>>,
) -> RollbackGuard<S> {
let guard = RollbackGuard(state.clone().write_owned().await);
guard.touch();
guard
}
fn spawn_keep_alive<S: Storage + Clone + 'static>(
chain_id: ChainId,
mut state: Arc<RwLock<ChainWorkerState<S>>>,
last_access: Arc<AtomicTimestamp>,
ttl: Duration,
) {
linera_base::Task::spawn(async move {
loop {
while let Some(remaining) = ttl
.checked_sub(last_access.elapsed())
.filter(|remaining| *remaining > Duration::ZERO)
{
linera_base::time::timer::sleep(remaining).await;
}
match Arc::try_unwrap(state) {
Ok(rw_lock) => {
tracing::debug!(%chain_id, "Dropping chain worker");
let mut worker_state = RwLock::into_inner(rw_lock);
let task = worker_state.clear_service_runtime();
drop(worker_state);
if let Some(task) = task {
if let Err(err) = task.await {
tracing::warn!(%err, "Failed to shut down service runtime");
}
}
break;
}
Err(arc) => {
arc.read().await.touch();
state = arc;
}
}
}
})
.forget();
}