wacker/
lib.rs

1mod runtime;
2mod server;
3pub mod utils;
4mod proto {
5    tonic::include_proto!("wacker");
6}
7
8use anyhow::{anyhow, bail, Result};
9use chrono::Local;
10use env_logger::{Builder, Target, WriteStyle};
11use hyper_util::rt::TokioIo;
12use log::{info, warn, LevelFilter};
13use std::fs::{create_dir_all, remove_file};
14use std::future::Future;
15use std::io::{ErrorKind, Write};
16use std::path::{Path, PathBuf};
17use tokio::net::{UnixListener, UnixStream};
18use tokio_stream::wrappers::UnixListenerStream;
19use tonic::{
20    codec::CompressionEncoding,
21    transport::{Channel, Endpoint},
22};
23use tower::service_fn;
24
25pub use self::proto::{
26    wacker_client::WackerClient as Client, DeleteRequest, ListResponse, LogRequest, LogResponse, Program,
27    ProgramResponse, RestartRequest, RunRequest, ServeRequest, StopRequest,
28};
29
30pub const PROGRAM_STATUS_RUNNING: u32 = 0;
31pub const PROGRAM_STATUS_FINISHED: u32 = 1;
32pub const PROGRAM_STATUS_ERROR: u32 = 2;
33pub const PROGRAM_STATUS_STOPPED: u32 = 3;
34
35pub const PROGRAM_TYPE_CLI: u32 = 0;
36pub const PROGRAM_TYPE_HTTP: u32 = 1;
37
38fn get_main_dir() -> Result<PathBuf> {
39    match dirs::home_dir() {
40        Some(home_dir) => Ok(home_dir.join(".wacker")),
41        None => Err(anyhow!("can't get home dir")),
42    }
43}
44
45#[derive(Default)]
46pub struct Server {
47    main_dir: Option<PathBuf>,
48    is_test: bool,
49}
50
51impl Server {
52    pub fn new() -> Self {
53        Default::default()
54    }
55
56    pub fn with_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Self {
57        self.main_dir = Some(dir.as_ref().to_path_buf());
58        self
59    }
60
61    pub fn is_test(&mut self, is_test: bool) -> &mut Self {
62        self.is_test = is_test;
63        self
64    }
65
66    pub async fn start<F: Future<Output = ()> + Send + 'static>(&self, shutdown: F) -> Result<()> {
67        let main_dir = match &self.main_dir {
68            Some(p) => p.clone(),
69            None => get_main_dir()?,
70        };
71
72        let sock_path = main_dir.join("wacker.sock");
73        if sock_path.exists() {
74            bail!("wackerd socket file exists, is wackerd already running?");
75        }
76
77        let logs_dir = main_dir.join("logs");
78        if !logs_dir.exists() {
79            create_dir_all(logs_dir.as_path())?;
80        }
81
82        let db_path = main_dir.join("db");
83        let db = sled::open(db_path)?;
84
85        let mut log_builder = Builder::new();
86        log_builder
87            .format(|buf, record| {
88                writeln!(
89                    buf,
90                    "[{} {} {}] {}",
91                    Local::now().format("%Y-%m-%d %H:%M:%S"),
92                    record.level(),
93                    record.target(),
94                    record.args(),
95                )
96            })
97            .filter_level(LevelFilter::Info)
98            .write_style(WriteStyle::Never)
99            .target(Target::Stdout);
100        if self.is_test {
101            let _ = log_builder.is_test(true).try_init();
102        } else {
103            log_builder.try_init()?;
104        }
105
106        let uds = UnixListener::bind(sock_path.as_path())?;
107        let uds_stream = UnixListenerStream::new(uds);
108        let service = proto::wacker_server::WackerServer::new(server::Server::new(db.clone(), logs_dir).await?)
109            .send_compressed(CompressionEncoding::Zstd)
110            .accept_compressed(CompressionEncoding::Zstd)
111            .send_compressed(CompressionEncoding::Gzip)
112            .accept_compressed(CompressionEncoding::Gzip);
113
114        info!("server listening on {:?}", sock_path.as_path());
115
116        let run = tonic::transport::Server::builder()
117            .add_service(service)
118            .serve_with_incoming_shutdown(uds_stream, async move {
119                shutdown.await;
120                info!("Shutting down the server");
121                if let Err(err) = remove_file(sock_path) {
122                    if err.kind() != ErrorKind::NotFound {
123                        warn!("failed to remove existing socket file: {}", err);
124                    }
125                }
126                if let Err(err) = db.flush_async().await {
127                    warn!("failed to flush the db: {}", err);
128                }
129            });
130
131        if self.is_test {
132            tokio::spawn(run);
133        } else {
134            run.await?;
135        }
136        Ok(())
137    }
138}
139
140pub async fn new_client() -> Result<Client<Channel>> {
141    new_client_with_path(get_main_dir()?.join("wacker.sock")).await
142}
143
144pub async fn new_client_with_path<P: AsRef<Path>>(sock_path: P) -> Result<Client<Channel>> {
145    let path = sock_path.as_ref().to_path_buf();
146    // We will ignore this uri because uds do not use it
147    let channel = Endpoint::try_from("unix://wacker")?
148        .connect_with_connector(service_fn(move |_| {
149            let path = path.clone();
150            async move {
151                // Connect to a Uds socket
152                Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
153            }
154        }))
155        .await?;
156
157    Ok(Client::new(channel)
158        .send_compressed(CompressionEncoding::Zstd)
159        .accept_compressed(CompressionEncoding::Zstd)
160        .accept_compressed(CompressionEncoding::Gzip))
161}