cargo_lambda_watch/
watcher.rs

1use crate::{error::ServerError, requests::NextEvent, state::ExtensionCache};
2use cargo_lambda_metadata::{
3    cargo::load_metadata,
4    config::{ConfigOptions, FunctionNames, load_config_without_cli_flags},
5};
6use ignore::create_filter;
7use ignore_files::IgnoreFile;
8use std::{collections::HashMap, convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
9use tracing::{debug, error, trace};
10use watchexec::{
11    ErrorHook, Watchexec,
12    action::{Action, Outcome, PreSpawn},
13    command::Command,
14    config::{InitConfig, RuntimeConfig},
15    error::RuntimeError,
16    event::{Event, Priority, ProcessEnd},
17    handler::SyncFnHandler,
18    signal::source::MainSignal,
19};
20
21pub(crate) mod ignore;
22
23#[derive(Clone, Debug, Default)]
24pub(crate) struct WatcherConfig {
25    pub runtime_api: String,
26    pub name: String,
27    pub bin_name: Option<String>,
28    pub base: PathBuf,
29    pub manifest_path: PathBuf,
30    pub ignore_files: Vec<IgnoreFile>,
31    pub ignore_changes: bool,
32    pub only_lambda_apis: bool,
33    pub env: HashMap<String, String>,
34    pub wait: bool,
35}
36
37impl WatcherConfig {
38    pub(crate) fn start_function(&self) -> bool {
39        !self.only_lambda_apis
40    }
41
42    pub(crate) fn send_function_init(&self) -> bool {
43        !self.only_lambda_apis && !self.wait
44    }
45}
46
47pub(crate) async fn new(
48    cmd: Command,
49    wc: WatcherConfig,
50    ext_cache: ExtensionCache,
51) -> Result<Arc<Watchexec>, ServerError> {
52    let init = crate::watcher::init();
53    let runtime = crate::watcher::runtime(cmd, wc, ext_cache).await?;
54
55    let wx = Watchexec::new(init, runtime).map_err(ServerError::WatcherError)?;
56    wx.send_event(Event::default(), Priority::Urgent)
57        .await
58        .map_err(ServerError::WatcherError)?;
59
60    Ok(wx)
61}
62
63fn init() -> InitConfig {
64    let mut config = InitConfig::default();
65    config.on_error(SyncFnHandler::from(
66        |err: ErrorHook| -> std::result::Result<(), Infallible> {
67            match err.error {
68                RuntimeError::IoError {
69                    // according to watchexec's documentation, this errors can be ignored.
70                    // see: https://github.com/watchexec/watchexec/blob/e06dc0dd16f8aa88a1556583eafbd985ca2c4eea/crates/lib/src/error/runtime.rs#L13-L15
71                    about: "waiting on process group",
72                    ..
73                } => {}
74                RuntimeError::FsWatcher { .. } | RuntimeError::EventChannelTrySend { .. } => {
75                    err.elevate()
76                }
77                e => {
78                    error!(error = ?e, "internal error watching your project");
79                }
80            }
81
82            Ok(())
83        },
84    ));
85
86    config
87}
88
89async fn runtime(
90    cmd: Command,
91    wc: WatcherConfig,
92    ext_cache: ExtensionCache,
93) -> Result<RuntimeConfig, ServerError> {
94    let mut config = RuntimeConfig::default();
95
96    config.pathset([wc.base.clone()]);
97    config.commands(vec![cmd]);
98
99    config.filterer(create_filter(&wc.base, &wc.ignore_files, wc.ignore_changes).await?);
100
101    config.action_throttle(Duration::from_secs(3));
102
103    config.on_action(move |action: Action| {
104        let signals: Vec<MainSignal> = action.events.iter().flat_map(|e| e.signals()).collect();
105        let has_paths = action
106            .events
107            .iter()
108            .flat_map(|e| e.paths())
109            .next()
110            .is_some();
111
112        let empty_event = action
113            .events
114            .iter()
115            .map(|e| e.is_empty())
116            .next()
117            .unwrap_or_default();
118
119        debug!(
120            ?action,
121            ?signals,
122            has_paths,
123            empty_event,
124            "watcher action received"
125        );
126
127        let ext_cache = ext_cache.clone();
128        async move {
129            if signals.contains(&MainSignal::Terminate) {
130                let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
131                if let Some(delay) = function_shutdown_delay {
132                    function_graceful_shutdown_or_else_sigkill(
133                        action,
134                        MainSignal::Terminate,
135                        delay,
136                    );
137                    return Ok(());
138                }
139                action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
140                return Ok(());
141            }
142            if signals.contains(&MainSignal::Interrupt) {
143                let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
144                if let Some(delay) = function_shutdown_delay {
145                    function_graceful_shutdown_or_else_sigkill(
146                        action,
147                        MainSignal::Interrupt,
148                        delay,
149                    );
150                    return Ok(());
151                }
152                action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
153                return Ok(());
154            }
155
156            if !has_paths {
157                if !signals.is_empty() {
158                    let mut out = Outcome::DoNothing;
159                    for sig in signals {
160                        out = Outcome::both(out, Outcome::Signal(sig));
161                    }
162
163                    action.outcome(out);
164                    return Ok(());
165                }
166
167                let completion = action.events.iter().flat_map(|e| e.completions()).next();
168                if let Some(status) = completion {
169                    match status {
170                        Some(ProcessEnd::ExitError(sig)) => {
171                            error!(code = ?sig, "command exited");
172                        }
173                        Some(ProcessEnd::ExitSignal(sig)) => {
174                            error!(code = ?sig, "command killed");
175                        }
176                        Some(ProcessEnd::ExitStop(sig)) => {
177                            error!(code = ?sig, "command stopped");
178                        }
179                        Some(ProcessEnd::Exception(sig)) => {
180                            error!(code = ?sig, "command ended by exception");
181                        }
182                        _ => {}
183                    };
184
185                    action.outcome(Outcome::DoNothing);
186                    return Ok(());
187                }
188            }
189
190            if !empty_event {
191                let event = NextEvent::shutdown("recompiling function");
192                ext_cache.send_event(event).await?;
193            }
194            let when_running = Outcome::both(Outcome::Stop, Outcome::Start);
195            action.outcome(Outcome::if_running(when_running, Outcome::Start));
196
197            Ok::<(), ServerError>(())
198        }
199    });
200
201    config.on_pre_spawn(move |prespawn: PreSpawn| {
202        let name = wc.name.clone();
203        let runtime_api = wc.runtime_api.clone();
204        let manifest_path = wc.manifest_path.clone();
205        let bin_name = wc.bin_name.clone();
206        let base_env = wc.env.clone();
207
208        async move {
209            trace!("loading watch environment metadata");
210
211            let new_env = reload_env(&manifest_path, &bin_name);
212
213            if let Some(mut command) = prespawn.command().await {
214                command
215                    .env("AWS_LAMBDA_FUNCTION_VERSION", "1")
216                    .env("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "4096")
217                    .envs(base_env)
218                    .envs(new_env)
219                    .env("AWS_LAMBDA_RUNTIME_API", &runtime_api)
220                    .env("AWS_LAMBDA_FUNCTION_NAME", &name);
221            }
222
223            Ok::<(), Infallible>(())
224        }
225    });
226
227    Ok(config)
228}
229
230fn function_graceful_shutdown_or_else_sigkill(
231    action: Action,
232    signal_type: MainSignal,
233    max_delay: Duration,
234) {
235    tracing::debug!(
236        ?signal_type,
237        ?max_delay,
238        "attempting graceful function shutdown"
239    );
240    action.outcome(Outcome::both(
241        // send sigterm
242        Outcome::Signal(signal_type),
243        // race graceful shutdown against forced shutdown following max delay
244        Outcome::race(
245            // happy path, process exits then watchexec exits
246            Outcome::both(Outcome::Wait, Outcome::Exit),
247            // unhappy path, we sleep max delay then SIGKILL and exit watchexec
248            Outcome::both(
249                Outcome::Sleep(max_delay),
250                Outcome::both(Outcome::Stop, Outcome::Exit),
251            ),
252        ),
253    ));
254}
255
256fn reload_env(manifest_path: &PathBuf, bin_name: &Option<String>) -> HashMap<String, String> {
257    let metadata = match load_metadata(manifest_path) {
258        Ok(metadata) => metadata,
259        Err(e) => {
260            error!("failed to reload metadata: {}", e);
261            return HashMap::new();
262        }
263    };
264
265    let options = ConfigOptions {
266        names: FunctionNames::new(None, bin_name.clone()),
267        ..Default::default()
268    };
269    let config = match load_config_without_cli_flags(&metadata, &options) {
270        Ok(config) => config,
271        Err(e) => {
272            error!("failed to reload config: {}", e);
273            return HashMap::new();
274        }
275    };
276
277    match config.watch.lambda_environment(&config.env) {
278        Ok(env) => env,
279        Err(e) => {
280            error!("failed to reload environment: {}", e);
281            HashMap::new()
282        }
283    }
284}