use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use wire_framework::{
FromContext, IntoContext,
resource::Resource,
service::{ServiceBuilder, StopReceiver},
task::{Task, TaskId},
vlog,
wiring_layer::{WiringError, WiringLayer},
};
#[derive(Debug, Clone)]
struct MemoryDatabase {
data: Arc<Mutex<HashMap<String, String>>>,
}
trait Database: 'static + Send + Sync {
fn put(&self, key: String, value: String);
fn get(&self, key: String) -> Option<String>;
}
impl Database for MemoryDatabase {
fn put(&self, key: String, value: String) {
self.data.lock().unwrap().insert(key, value);
}
fn get(&self, key: String) -> Option<String> {
self.data.lock().unwrap().get(&key).cloned()
}
}
#[derive(Clone)]
struct DatabaseResource(pub Arc<dyn Database>);
impl Resource for DatabaseResource {
fn name() -> String {
"common/database".into()
}
}
struct PutTask {
db: Arc<dyn Database>,
}
impl PutTask {
async fn run_inner(self) {
let mut counter = 0;
loop {
let key = format!("key_{}", counter);
let value = format!("value_{}", counter);
tracing::info!("Put key-value pair: {} -> {}", key, value);
self.db.put(key, value);
counter += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
#[async_trait::async_trait]
impl Task for PutTask {
fn id(&self) -> TaskId {
"put_task".into()
}
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> eyre::Result<()> {
tracing::info!("Starting the task {}", self.id());
tokio::select! {
_ = self.run_inner() => {},
_ = stop_receiver.0.changed() => {},
}
Ok(())
}
}
struct CheckTask {
db: Arc<dyn Database>,
}
impl CheckTask {
async fn run_inner(self) {
let mut counter = 0;
loop {
let key = format!("key_{}", counter);
let value = self.db.get(key.clone());
tracing::info!("Check key-value pair: {} -> {:?}", key, value);
if value.is_some() {
counter += 1;
}
tokio::time::sleep(Duration::from_millis(800)).await;
}
}
}
#[async_trait::async_trait]
impl Task for CheckTask {
fn id(&self) -> TaskId {
"check_task".into()
}
async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> eyre::Result<()> {
tracing::info!("Starting the task {}", self.id());
tokio::select! {
_ = self.run_inner() => {},
_ = stop_receiver.0.changed() => {},
}
Ok(())
}
}
struct DatabaseLayer;
#[derive(IntoContext)]
struct DatabaseLayerOutput {
pub db: DatabaseResource,
}
#[async_trait::async_trait]
impl WiringLayer for DatabaseLayer {
type Input = ();
type Output = DatabaseLayerOutput;
fn layer_name(&self) -> &'static str {
"database_layer"
}
async fn wire(self, _input: Self::Input) -> Result<Self::Output, WiringError> {
let database = Arc::new(MemoryDatabase {
data: Arc::new(Mutex::new(HashMap::new())),
});
Ok(DatabaseLayerOutput {
db: DatabaseResource(database),
})
}
}
struct TasksLayer;
#[derive(FromContext)]
struct TasksLayerInput {
pub db: DatabaseResource,
}
#[derive(IntoContext)]
struct TasksLayerOutput {
#[context(task)]
pub put_task: PutTask,
#[context(task)]
pub check_task: CheckTask,
}
#[async_trait::async_trait]
impl WiringLayer for TasksLayer {
type Input = TasksLayerInput;
type Output = TasksLayerOutput;
fn layer_name(&self) -> &'static str {
"tasks_layer"
}
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let db = input.db.0;
let put_task = PutTask { db: db.clone() };
let check_task = CheckTask { db };
Ok(TasksLayerOutput {
put_task,
check_task,
})
}
}
fn main() -> eyre::Result<()> {
let observability = vlog::ObservabilityBuilder::new()
.with_logs(Some(vlog::Logs::new("plain").unwrap()))
.build();
let mut builder = ServiceBuilder::new()?;
builder.add_layer(DatabaseLayer).add_layer(TasksLayer);
builder.build().run_with_guard(observability)?;
Ok(())
}