Skip to main content

cargo_lambda_watch/
watcher.rs

1use crate::{
2    error::ServerError, instance_pool::InstanceId, requests::NextEvent, state::ExtensionCache,
3};
4use cargo_lambda_metadata::{
5    cargo::load_metadata,
6    config::{ConfigOptions, FunctionNames, load_config_without_cli_flags},
7};
8use ignore::create_filter;
9use ignore_files::IgnoreFile;
10use std::{collections::HashMap, convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
11use tracing::{debug, error, trace};
12use watchexec::{
13    ErrorHook, Watchexec,
14    action::{Action, Outcome, PreSpawn},
15    command::Command,
16    config::{InitConfig, RuntimeConfig},
17    error::RuntimeError,
18    event::{Event, Priority, ProcessEnd},
19    handler::SyncFnHandler,
20    signal::source::MainSignal,
21};
22
23pub(crate) mod ignore;
24
25#[derive(Clone, Debug, Default)]
26pub(crate) struct WatcherConfig {
27    pub runtime_api: String,
28    pub name: String,
29    pub bin_name: Option<String>,
30    pub base: PathBuf,
31    pub manifest_path: PathBuf,
32    pub ignore_files: Vec<IgnoreFile>,
33    pub ignore_changes: bool,
34    pub only_lambda_apis: bool,
35    pub env: HashMap<String, String>,
36    pub wait: bool,
37    pub instance_id: Option<InstanceId>,
38}
39
40impl WatcherConfig {
41    pub(crate) fn start_function(&self) -> bool {
42        !self.only_lambda_apis
43    }
44
45    pub(crate) fn send_function_init(&self) -> bool {
46        !self.only_lambda_apis && !self.wait
47    }
48}
49
50pub(crate) async fn new(
51    cmd: Command,
52    wc: WatcherConfig,
53    ext_cache: ExtensionCache,
54) -> Result<Arc<Watchexec>, ServerError> {
55    let init = crate::watcher::init();
56    let runtime = crate::watcher::runtime(cmd, wc, ext_cache).await?;
57
58    let wx = Watchexec::new(init, runtime)?;
59    wx.send_event(Event::default(), Priority::Urgent).await?;
60
61    Ok(wx)
62}
63
64fn init() -> InitConfig {
65    let mut config = InitConfig::default();
66    config.on_error(SyncFnHandler::from(
67        |err: ErrorHook| -> std::result::Result<(), Infallible> {
68            match err.error {
69                RuntimeError::IoError {
70                    // according to watchexec's documentation, this errors can be ignored.
71                    // see: https://github.com/watchexec/watchexec/blob/e06dc0dd16f8aa88a1556583eafbd985ca2c4eea/crates/lib/src/error/runtime.rs#L13-L15
72                    about: "waiting on process group",
73                    ..
74                } => {}
75                RuntimeError::FsWatcher { .. } | RuntimeError::EventChannelTrySend { .. } => {
76                    err.elevate()
77                }
78                e => {
79                    error!(error = ?e, "internal error watching your project");
80                }
81            }
82
83            Ok(())
84        },
85    ));
86
87    config
88}
89
90async fn runtime(
91    cmd: Command,
92    wc: WatcherConfig,
93    ext_cache: ExtensionCache,
94) -> Result<RuntimeConfig, ServerError> {
95    let mut config = RuntimeConfig::default();
96
97    config.pathset([wc.base.clone()]);
98    config.commands(vec![cmd]);
99
100    config.filterer(create_filter(&wc.base, &wc.ignore_files, wc.ignore_changes).await?);
101
102    config.action_throttle(Duration::from_secs(3));
103
104    config.on_action(move |action: Action| {
105        let signals: Vec<MainSignal> = action.events.iter().flat_map(|e| e.signals()).collect();
106        let has_paths = action
107            .events
108            .iter()
109            .flat_map(|e| e.paths())
110            .next()
111            .is_some();
112
113        let empty_event = action
114            .events
115            .iter()
116            .map(|e| e.is_empty())
117            .next()
118            .unwrap_or_default();
119
120        debug!(
121            ?action,
122            ?signals,
123            has_paths,
124            empty_event,
125            "watcher action received"
126        );
127
128        let ext_cache = ext_cache.clone();
129        async move {
130            if signals.contains(&MainSignal::Terminate) {
131                let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
132                if let Some(delay) = function_shutdown_delay {
133                    function_graceful_shutdown_or_else_sigkill(
134                        action,
135                        MainSignal::Terminate,
136                        delay,
137                    );
138                    return Ok(());
139                }
140                action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
141                return Ok(());
142            }
143            if signals.contains(&MainSignal::Interrupt) {
144                let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
145                if let Some(delay) = function_shutdown_delay {
146                    function_graceful_shutdown_or_else_sigkill(
147                        action,
148                        MainSignal::Interrupt,
149                        delay,
150                    );
151                    return Ok(());
152                }
153                action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
154                return Ok(());
155            }
156
157            if !has_paths {
158                if !signals.is_empty() {
159                    let mut out = Outcome::DoNothing;
160                    for sig in signals {
161                        out = Outcome::both(out, Outcome::Signal(sig));
162                    }
163
164                    action.outcome(out);
165                    return Ok(());
166                }
167
168                let completion = action.events.iter().flat_map(|e| e.completions()).next();
169                if let Some(status) = completion {
170                    match status {
171                        Some(ProcessEnd::ExitError(sig)) => {
172                            error!(code = ?sig, "command exited");
173                        }
174                        Some(ProcessEnd::ExitSignal(sig)) => {
175                            error!(code = ?sig, "command killed");
176                        }
177                        Some(ProcessEnd::ExitStop(sig)) => {
178                            error!(code = ?sig, "command stopped");
179                        }
180                        Some(ProcessEnd::Exception(sig)) => {
181                            error!(code = ?sig, "command ended by exception");
182                        }
183                        _ => {}
184                    };
185
186                    action.outcome(Outcome::DoNothing);
187                    return Ok(());
188                }
189            }
190
191            if !empty_event {
192                let event = NextEvent::shutdown("recompiling function");
193                ext_cache.send_event(event).await?;
194            }
195            let when_running = Outcome::both(Outcome::Stop, Outcome::Start);
196            action.outcome(Outcome::if_running(when_running, Outcome::Start));
197
198            Ok::<(), ServerError>(())
199        }
200    });
201
202    config.on_pre_spawn(move |prespawn: PreSpawn| {
203        let name = wc.name.clone();
204        let runtime_api = wc.runtime_api.clone();
205        let manifest_path = wc.manifest_path.clone();
206        let bin_name = wc.bin_name.clone();
207        let base_env = wc.env.clone();
208        let instance_id = wc.instance_id;
209
210        async move {
211            trace!("loading watch environment metadata");
212
213            let new_env = reload_env(&manifest_path, &bin_name);
214
215            if let Some(mut command) = prespawn.command().await {
216                command
217                    .env("AWS_LAMBDA_FUNCTION_VERSION", "1")
218                    .env("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "4096")
219                    .envs(base_env)
220                    .envs(new_env)
221                    .env("AWS_LAMBDA_RUNTIME_API", &runtime_api)
222                    .env("AWS_LAMBDA_FUNCTION_NAME", &name);
223
224                command.env("AWS_LAMBDA_MAX_CONCURRENCY", "1");
225
226                if let Some(id) = instance_id {
227                    command.env("AWS_LAMBDA_INSTANCE_ID", id.to_string());
228                }
229            }
230
231            Ok::<(), Infallible>(())
232        }
233    });
234
235    Ok(config)
236}
237
238fn function_graceful_shutdown_or_else_sigkill(
239    action: Action,
240    signal_type: MainSignal,
241    max_delay: Duration,
242) {
243    tracing::debug!(
244        ?signal_type,
245        ?max_delay,
246        "attempting graceful function shutdown"
247    );
248    action.outcome(Outcome::both(
249        // send sigterm
250        Outcome::Signal(signal_type),
251        // race graceful shutdown against forced shutdown following max delay
252        Outcome::race(
253            // happy path, process exits then watchexec exits
254            Outcome::both(Outcome::Wait, Outcome::Exit),
255            // unhappy path, we sleep max delay then SIGKILL and exit watchexec
256            Outcome::both(
257                Outcome::Sleep(max_delay),
258                Outcome::both(Outcome::Stop, Outcome::Exit),
259            ),
260        ),
261    ));
262}
263
264fn reload_env(manifest_path: &PathBuf, bin_name: &Option<String>) -> HashMap<String, String> {
265    let metadata = match load_metadata(manifest_path, None) {
266        Ok(metadata) => metadata,
267        Err(e) => {
268            error!("failed to reload metadata: {}", e);
269            return HashMap::new();
270        }
271    };
272
273    let options = ConfigOptions {
274        names: FunctionNames::new(None, bin_name.clone()),
275        ..Default::default()
276    };
277    let config = match load_config_without_cli_flags(&metadata, &options) {
278        Ok(config) => config,
279        Err(e) => {
280            error!("failed to reload config: {}", e);
281            return HashMap::new();
282        }
283    };
284
285    match config.watch.lambda_environment(&config.env) {
286        Ok(env) => env,
287        Err(e) => {
288            error!("failed to reload environment: {}", e);
289            HashMap::new()
290        }
291    }
292}