1use crate::{error::ServerError, requests::NextEvent, state::ExtensionCache};
2use cargo_lambda_metadata::{
3 cargo::load_metadata,
4 config::{ConfigOptions, FunctionNames, load_config_without_cli_flags},
5};
6use ignore::create_filter;
7use ignore_files::IgnoreFile;
8use std::{collections::HashMap, convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
9use tracing::{debug, error, trace};
10use watchexec::{
11 ErrorHook, Watchexec,
12 action::{Action, Outcome, PreSpawn},
13 command::Command,
14 config::{InitConfig, RuntimeConfig},
15 error::RuntimeError,
16 event::{Event, Priority, ProcessEnd},
17 handler::SyncFnHandler,
18 signal::source::MainSignal,
19};
20
21pub(crate) mod ignore;
22
23#[derive(Clone, Debug, Default)]
24pub(crate) struct WatcherConfig {
25 pub runtime_api: String,
26 pub name: String,
27 pub bin_name: Option<String>,
28 pub base: PathBuf,
29 pub manifest_path: PathBuf,
30 pub ignore_files: Vec<IgnoreFile>,
31 pub ignore_changes: bool,
32 pub only_lambda_apis: bool,
33 pub env: HashMap<String, String>,
34 pub wait: bool,
35}
36
37impl WatcherConfig {
38 pub(crate) fn start_function(&self) -> bool {
39 !self.only_lambda_apis
40 }
41
42 pub(crate) fn send_function_init(&self) -> bool {
43 !self.only_lambda_apis && !self.wait
44 }
45}
46
47pub(crate) async fn new(
48 cmd: Command,
49 wc: WatcherConfig,
50 ext_cache: ExtensionCache,
51) -> Result<Arc<Watchexec>, ServerError> {
52 let init = crate::watcher::init();
53 let runtime = crate::watcher::runtime(cmd, wc, ext_cache).await?;
54
55 let wx = Watchexec::new(init, runtime).map_err(ServerError::WatcherError)?;
56 wx.send_event(Event::default(), Priority::Urgent)
57 .await
58 .map_err(ServerError::WatcherError)?;
59
60 Ok(wx)
61}
62
63fn init() -> InitConfig {
64 let mut config = InitConfig::default();
65 config.on_error(SyncFnHandler::from(
66 |err: ErrorHook| -> std::result::Result<(), Infallible> {
67 match err.error {
68 RuntimeError::IoError {
69 about: "waiting on process group",
72 ..
73 } => {}
74 RuntimeError::FsWatcher { .. } | RuntimeError::EventChannelTrySend { .. } => {
75 err.elevate()
76 }
77 e => {
78 error!(error = ?e, "internal error watching your project");
79 }
80 }
81
82 Ok(())
83 },
84 ));
85
86 config
87}
88
89async fn runtime(
90 cmd: Command,
91 wc: WatcherConfig,
92 ext_cache: ExtensionCache,
93) -> Result<RuntimeConfig, ServerError> {
94 let mut config = RuntimeConfig::default();
95
96 config.pathset([wc.base.clone()]);
97 config.commands(vec![cmd]);
98
99 config.filterer(create_filter(&wc.base, &wc.ignore_files, wc.ignore_changes).await?);
100
101 config.action_throttle(Duration::from_secs(3));
102
103 config.on_action(move |action: Action| {
104 let signals: Vec<MainSignal> = action.events.iter().flat_map(|e| e.signals()).collect();
105 let has_paths = action
106 .events
107 .iter()
108 .flat_map(|e| e.paths())
109 .next()
110 .is_some();
111
112 let empty_event = action
113 .events
114 .iter()
115 .map(|e| e.is_empty())
116 .next()
117 .unwrap_or_default();
118
119 debug!(
120 ?action,
121 ?signals,
122 has_paths,
123 empty_event,
124 "watcher action received"
125 );
126
127 let ext_cache = ext_cache.clone();
128 async move {
129 if signals.contains(&MainSignal::Terminate) {
130 let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
131 if let Some(delay) = function_shutdown_delay {
132 function_graceful_shutdown_or_else_sigkill(
133 action,
134 MainSignal::Terminate,
135 delay,
136 );
137 return Ok(());
138 }
139 action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
140 return Ok(());
141 }
142 if signals.contains(&MainSignal::Interrupt) {
143 let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
144 if let Some(delay) = function_shutdown_delay {
145 function_graceful_shutdown_or_else_sigkill(
146 action,
147 MainSignal::Interrupt,
148 delay,
149 );
150 return Ok(());
151 }
152 action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
153 return Ok(());
154 }
155
156 if !has_paths {
157 if !signals.is_empty() {
158 let mut out = Outcome::DoNothing;
159 for sig in signals {
160 out = Outcome::both(out, Outcome::Signal(sig));
161 }
162
163 action.outcome(out);
164 return Ok(());
165 }
166
167 let completion = action.events.iter().flat_map(|e| e.completions()).next();
168 if let Some(status) = completion {
169 match status {
170 Some(ProcessEnd::ExitError(sig)) => {
171 error!(code = ?sig, "command exited");
172 }
173 Some(ProcessEnd::ExitSignal(sig)) => {
174 error!(code = ?sig, "command killed");
175 }
176 Some(ProcessEnd::ExitStop(sig)) => {
177 error!(code = ?sig, "command stopped");
178 }
179 Some(ProcessEnd::Exception(sig)) => {
180 error!(code = ?sig, "command ended by exception");
181 }
182 _ => {}
183 };
184
185 action.outcome(Outcome::DoNothing);
186 return Ok(());
187 }
188 }
189
190 if !empty_event {
191 let event = NextEvent::shutdown("recompiling function");
192 ext_cache.send_event(event).await?;
193 }
194 let when_running = Outcome::both(Outcome::Stop, Outcome::Start);
195 action.outcome(Outcome::if_running(when_running, Outcome::Start));
196
197 Ok::<(), ServerError>(())
198 }
199 });
200
201 config.on_pre_spawn(move |prespawn: PreSpawn| {
202 let name = wc.name.clone();
203 let runtime_api = wc.runtime_api.clone();
204 let manifest_path = wc.manifest_path.clone();
205 let bin_name = wc.bin_name.clone();
206 let base_env = wc.env.clone();
207
208 async move {
209 trace!("loading watch environment metadata");
210
211 let new_env = reload_env(&manifest_path, &bin_name);
212
213 if let Some(mut command) = prespawn.command().await {
214 command
215 .env("AWS_LAMBDA_FUNCTION_VERSION", "1")
216 .env("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "4096")
217 .envs(base_env)
218 .envs(new_env)
219 .env("AWS_LAMBDA_RUNTIME_API", &runtime_api)
220 .env("AWS_LAMBDA_FUNCTION_NAME", &name);
221 }
222
223 Ok::<(), Infallible>(())
224 }
225 });
226
227 Ok(config)
228}
229
230fn function_graceful_shutdown_or_else_sigkill(
231 action: Action,
232 signal_type: MainSignal,
233 max_delay: Duration,
234) {
235 tracing::debug!(
236 ?signal_type,
237 ?max_delay,
238 "attempting graceful function shutdown"
239 );
240 action.outcome(Outcome::both(
241 Outcome::Signal(signal_type),
243 Outcome::race(
245 Outcome::both(Outcome::Wait, Outcome::Exit),
247 Outcome::both(
249 Outcome::Sleep(max_delay),
250 Outcome::both(Outcome::Stop, Outcome::Exit),
251 ),
252 ),
253 ));
254}
255
256fn reload_env(manifest_path: &PathBuf, bin_name: &Option<String>) -> HashMap<String, String> {
257 let metadata = match load_metadata(manifest_path) {
258 Ok(metadata) => metadata,
259 Err(e) => {
260 error!("failed to reload metadata: {}", e);
261 return HashMap::new();
262 }
263 };
264
265 let options = ConfigOptions {
266 names: FunctionNames::new(None, bin_name.clone()),
267 ..Default::default()
268 };
269 let config = match load_config_without_cli_flags(&metadata, &options) {
270 Ok(config) => config,
271 Err(e) => {
272 error!("failed to reload config: {}", e);
273 return HashMap::new();
274 }
275 };
276
277 match config.watch.lambda_environment(&config.env) {
278 Ok(env) => env,
279 Err(e) => {
280 error!("failed to reload environment: {}", e);
281 HashMap::new()
282 }
283 }
284}