1use async_trait::async_trait;
2use clap::Parser;
3use log::{debug, error, info, trace, warn};
4use tokio::sync::mpsc::Sender;
5use tracing::Level;
6use tracing_subscriber;
7
8use wora::prelude::*;
9
10#[derive(Clone, Debug, Parser)]
11#[command(author, version, about, long_about = "A basic wora example to show off various features")]
12struct BasicAppOpts {
13 #[arg(short, long, default_value_t = 0)]
15 counter: u32,
16
17 #[arg(short, long, default_value_t=log::LevelFilter::Trace)]
19 level: log::LevelFilter,
20}
21
22#[derive(Debug)]
23struct BasicApp {
24 args: BasicAppOpts,
25 counter: u32,
26}
27
28#[async_trait]
29impl App<(), ()> for BasicApp {
30 type AppConfig = NoConfig;
31 type Setup = ();
32 fn name(&self) -> &'static str {
33 "wora_basic"
34 }
35
36 async fn setup(
37 &mut self,
38 _wora: &Wora<(), ()>,
39 _exec: impl AsyncExecutor<(), ()>,
40 _fs: impl WFS,
41 _o11y: Sender<O11yEvent<()>>,
42 ) -> Result<Self::Setup, Box<dyn std::error::Error>> {
43 debug!("command args: {:?}", self.args);
44 Ok(())
45 }
46
47 async fn main(&mut self, _wora: &mut Wora<(), ()>, _exec: impl AsyncExecutor<(), ()>, _fs: impl WFS, _o11y: Sender<O11yEvent<()>>) -> MainRetryAction {
48 trace!("Trace message");
49 debug!("Debug message");
50 info!("Info message");
51 warn!("Warning message");
52 error!("Error message");
53 self.counter += 1;
54
55 MainRetryAction::Success
56 }
57
58 async fn is_healthy(&mut self) -> HealthState {
59 HealthState::Ok
60 }
61
62 async fn end(&mut self, _wora: &Wora<(), ()>, _exec: impl AsyncExecutor<(), ()>, _fs: impl WFS, _o11y: Sender<O11yEvent<()>>) {
63 info!("Final count: {}", self.counter);
64 }
65}
66
67#[tokio::main]
68async fn main() -> Result<(), MainEarlyReturn> {
69 let (tx, mut rx) = tokio::sync::mpsc::channel::<O11yEvent<()>>(10);
70 let _o11y_consumer_task = tokio::spawn(async move {
71 while let Some(res) = rx.recv().await {
72 match res.kind {
73 O11yEventKind::Status(cap, sz) => {
74 println!("{}: status cap:{} max:{}", res.timestamp, cap, sz);
75 }
76 O11yEventKind::App(_O11y) => {}
77 O11yEventKind::HostInfo(_hi) => {}
78 O11yEventKind::HostStats(_hs) => {}
79 O11yEventKind::Flush => {
80 println!("{}: flush", res.timestamp);
81 }
82 O11yEventKind::Finish => {
83 println!("{}: finish", res.timestamp);
84 }
85 O11yEventKind::Init(log_dir) => {
86 println!("{}: init log_dir:{:?}", res.timestamp, log_dir);
87 }
88 O11yEventKind::Log(level, target, name) => {
89 println!("{}: {} target:{} name:{}", res.timestamp, level, target, name);
90 }
91 O11yEventKind::Reconnect => {}
92 O11yEventKind::Clear => {}
93 O11yEventKind::Span(_, _) => {}
94 }
95 }
96 });
97
98 let wob = Observability {
99 tx: tx.clone(),
100 level: Level::INFO,
101 };
102
103 tracing_subscriber::registry().with(wob).init();
104
105 let app_name = "wora_basic";
106
107 let args = BasicAppOpts::parse();
108
109 let app = BasicApp { args: args, counter: 1 };
110
111 let fs = PhysicalVFS::new();
112 let interval = std::time::Duration::from_secs(5);
113
114 let o11y = O11yProcessorOptionsBuilder::default()
115 .sender(tx)
116 .flush_interval(interval.clone())
117 .status_interval(interval.clone())
118 .host_stats_interval(interval.clone())
119 .build()
120 .unwrap();
121 match UnixLikeUser::new(app_name, fs.clone()).await {
122 Ok(exec) => exec_async_runner(exec, app, fs.clone(), o11y).await?,
123 Err(exec_err) => {
124 error!("exec error:{}", exec_err);
125 return Err(MainEarlyReturn::Vfs(exec_err));
126 }
127 }
128
129 Ok(())
130}