1mod runtime;
2mod server;
3pub mod utils;
4mod proto {
5 tonic::include_proto!("wacker");
6}
7
8use anyhow::{anyhow, bail, Result};
9use chrono::Local;
10use env_logger::{Builder, Target, WriteStyle};
11use hyper_util::rt::TokioIo;
12use log::{info, warn, LevelFilter};
13use std::fs::{create_dir_all, remove_file};
14use std::future::Future;
15use std::io::{ErrorKind, Write};
16use std::path::{Path, PathBuf};
17use tokio::net::{UnixListener, UnixStream};
18use tokio_stream::wrappers::UnixListenerStream;
19use tonic::{
20 codec::CompressionEncoding,
21 transport::{Channel, Endpoint},
22};
23use tower::service_fn;
24
25pub use self::proto::{
26 wacker_client::WackerClient as Client, DeleteRequest, ListResponse, LogRequest, LogResponse, Program,
27 ProgramResponse, RestartRequest, RunRequest, ServeRequest, StopRequest,
28};
29
30pub const PROGRAM_STATUS_RUNNING: u32 = 0;
31pub const PROGRAM_STATUS_FINISHED: u32 = 1;
32pub const PROGRAM_STATUS_ERROR: u32 = 2;
33pub const PROGRAM_STATUS_STOPPED: u32 = 3;
34
35pub const PROGRAM_TYPE_CLI: u32 = 0;
36pub const PROGRAM_TYPE_HTTP: u32 = 1;
37
38fn get_main_dir() -> Result<PathBuf> {
39 match dirs::home_dir() {
40 Some(home_dir) => Ok(home_dir.join(".wacker")),
41 None => Err(anyhow!("can't get home dir")),
42 }
43}
44
45#[derive(Default)]
46pub struct Server {
47 main_dir: Option<PathBuf>,
48 is_test: bool,
49}
50
51impl Server {
52 pub fn new() -> Self {
53 Default::default()
54 }
55
56 pub fn with_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Self {
57 self.main_dir = Some(dir.as_ref().to_path_buf());
58 self
59 }
60
61 pub fn is_test(&mut self, is_test: bool) -> &mut Self {
62 self.is_test = is_test;
63 self
64 }
65
66 pub async fn start<F: Future<Output = ()> + Send + 'static>(&self, shutdown: F) -> Result<()> {
67 let main_dir = match &self.main_dir {
68 Some(p) => p.clone(),
69 None => get_main_dir()?,
70 };
71
72 let sock_path = main_dir.join("wacker.sock");
73 if sock_path.exists() {
74 bail!("wackerd socket file exists, is wackerd already running?");
75 }
76
77 let logs_dir = main_dir.join("logs");
78 if !logs_dir.exists() {
79 create_dir_all(logs_dir.as_path())?;
80 }
81
82 let db_path = main_dir.join("db");
83 let db = sled::open(db_path)?;
84
85 let mut log_builder = Builder::new();
86 log_builder
87 .format(|buf, record| {
88 writeln!(
89 buf,
90 "[{} {} {}] {}",
91 Local::now().format("%Y-%m-%d %H:%M:%S"),
92 record.level(),
93 record.target(),
94 record.args(),
95 )
96 })
97 .filter_level(LevelFilter::Info)
98 .write_style(WriteStyle::Never)
99 .target(Target::Stdout);
100 if self.is_test {
101 let _ = log_builder.is_test(true).try_init();
102 } else {
103 log_builder.try_init()?;
104 }
105
106 let uds = UnixListener::bind(sock_path.as_path())?;
107 let uds_stream = UnixListenerStream::new(uds);
108 let service = proto::wacker_server::WackerServer::new(server::Server::new(db.clone(), logs_dir).await?)
109 .send_compressed(CompressionEncoding::Zstd)
110 .accept_compressed(CompressionEncoding::Zstd)
111 .send_compressed(CompressionEncoding::Gzip)
112 .accept_compressed(CompressionEncoding::Gzip);
113
114 info!("server listening on {:?}", sock_path.as_path());
115
116 let run = tonic::transport::Server::builder()
117 .add_service(service)
118 .serve_with_incoming_shutdown(uds_stream, async move {
119 shutdown.await;
120 info!("Shutting down the server");
121 if let Err(err) = remove_file(sock_path) {
122 if err.kind() != ErrorKind::NotFound {
123 warn!("failed to remove existing socket file: {}", err);
124 }
125 }
126 if let Err(err) = db.flush_async().await {
127 warn!("failed to flush the db: {}", err);
128 }
129 });
130
131 if self.is_test {
132 tokio::spawn(run);
133 } else {
134 run.await?;
135 }
136 Ok(())
137 }
138}
139
140pub async fn new_client() -> Result<Client<Channel>> {
141 new_client_with_path(get_main_dir()?.join("wacker.sock")).await
142}
143
144pub async fn new_client_with_path<P: AsRef<Path>>(sock_path: P) -> Result<Client<Channel>> {
145 let path = sock_path.as_ref().to_path_buf();
146 let channel = Endpoint::try_from("unix://wacker")?
148 .connect_with_connector(service_fn(move |_| {
149 let path = path.clone();
150 async move {
151 Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
153 }
154 }))
155 .await?;
156
157 Ok(Client::new(channel)
158 .send_compressed(CompressionEncoding::Zstd)
159 .accept_compressed(CompressionEncoding::Zstd)
160 .accept_compressed(CompressionEncoding::Gzip))
161}