1use crate::{
2 error::ServerError, instance_pool::InstanceId, requests::NextEvent, state::ExtensionCache,
3};
4use cargo_lambda_metadata::{
5 cargo::load_metadata,
6 config::{ConfigOptions, FunctionNames, load_config_without_cli_flags},
7};
8use ignore::create_filter;
9use ignore_files::IgnoreFile;
10use std::{collections::HashMap, convert::Infallible, path::PathBuf, sync::Arc, time::Duration};
11use tracing::{debug, error, trace};
12use watchexec::{
13 ErrorHook, Watchexec,
14 action::{Action, Outcome, PreSpawn},
15 command::Command,
16 config::{InitConfig, RuntimeConfig},
17 error::RuntimeError,
18 event::{Event, Priority, ProcessEnd},
19 handler::SyncFnHandler,
20 signal::source::MainSignal,
21};
22
23pub(crate) mod ignore;
24
25#[derive(Clone, Debug, Default)]
26pub(crate) struct WatcherConfig {
27 pub runtime_api: String,
28 pub name: String,
29 pub bin_name: Option<String>,
30 pub base: PathBuf,
31 pub manifest_path: PathBuf,
32 pub ignore_files: Vec<IgnoreFile>,
33 pub ignore_changes: bool,
34 pub only_lambda_apis: bool,
35 pub env: HashMap<String, String>,
36 pub wait: bool,
37 pub instance_id: Option<InstanceId>,
38}
39
40impl WatcherConfig {
41 pub(crate) fn start_function(&self) -> bool {
42 !self.only_lambda_apis
43 }
44
45 pub(crate) fn send_function_init(&self) -> bool {
46 !self.only_lambda_apis && !self.wait
47 }
48}
49
50pub(crate) async fn new(
51 cmd: Command,
52 wc: WatcherConfig,
53 ext_cache: ExtensionCache,
54) -> Result<Arc<Watchexec>, ServerError> {
55 let init = crate::watcher::init();
56 let runtime = crate::watcher::runtime(cmd, wc, ext_cache).await?;
57
58 let wx = Watchexec::new(init, runtime)?;
59 wx.send_event(Event::default(), Priority::Urgent).await?;
60
61 Ok(wx)
62}
63
64fn init() -> InitConfig {
65 let mut config = InitConfig::default();
66 config.on_error(SyncFnHandler::from(
67 |err: ErrorHook| -> std::result::Result<(), Infallible> {
68 match err.error {
69 RuntimeError::IoError {
70 about: "waiting on process group",
73 ..
74 } => {}
75 RuntimeError::FsWatcher { .. } | RuntimeError::EventChannelTrySend { .. } => {
76 err.elevate()
77 }
78 e => {
79 error!(error = ?e, "internal error watching your project");
80 }
81 }
82
83 Ok(())
84 },
85 ));
86
87 config
88}
89
90async fn runtime(
91 cmd: Command,
92 wc: WatcherConfig,
93 ext_cache: ExtensionCache,
94) -> Result<RuntimeConfig, ServerError> {
95 let mut config = RuntimeConfig::default();
96
97 config.pathset([wc.base.clone()]);
98 config.commands(vec![cmd]);
99
100 config.filterer(create_filter(&wc.base, &wc.ignore_files, wc.ignore_changes).await?);
101
102 config.action_throttle(Duration::from_secs(3));
103
104 config.on_action(move |action: Action| {
105 let signals: Vec<MainSignal> = action.events.iter().flat_map(|e| e.signals()).collect();
106 let has_paths = action
107 .events
108 .iter()
109 .flat_map(|e| e.paths())
110 .next()
111 .is_some();
112
113 let empty_event = action
114 .events
115 .iter()
116 .map(|e| e.is_empty())
117 .next()
118 .unwrap_or_default();
119
120 debug!(
121 ?action,
122 ?signals,
123 has_paths,
124 empty_event,
125 "watcher action received"
126 );
127
128 let ext_cache = ext_cache.clone();
129 async move {
130 if signals.contains(&MainSignal::Terminate) {
131 let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
132 if let Some(delay) = function_shutdown_delay {
133 function_graceful_shutdown_or_else_sigkill(
134 action,
135 MainSignal::Terminate,
136 delay,
137 );
138 return Ok(());
139 }
140 action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
141 return Ok(());
142 }
143 if signals.contains(&MainSignal::Interrupt) {
144 let function_shutdown_delay = ext_cache.function_shutdown_delay().await;
145 if let Some(delay) = function_shutdown_delay {
146 function_graceful_shutdown_or_else_sigkill(
147 action,
148 MainSignal::Interrupt,
149 delay,
150 );
151 return Ok(());
152 }
153 action.outcome(Outcome::both(Outcome::Stop, Outcome::Exit));
154 return Ok(());
155 }
156
157 if !has_paths {
158 if !signals.is_empty() {
159 let mut out = Outcome::DoNothing;
160 for sig in signals {
161 out = Outcome::both(out, Outcome::Signal(sig));
162 }
163
164 action.outcome(out);
165 return Ok(());
166 }
167
168 let completion = action.events.iter().flat_map(|e| e.completions()).next();
169 if let Some(status) = completion {
170 match status {
171 Some(ProcessEnd::ExitError(sig)) => {
172 error!(code = ?sig, "command exited");
173 }
174 Some(ProcessEnd::ExitSignal(sig)) => {
175 error!(code = ?sig, "command killed");
176 }
177 Some(ProcessEnd::ExitStop(sig)) => {
178 error!(code = ?sig, "command stopped");
179 }
180 Some(ProcessEnd::Exception(sig)) => {
181 error!(code = ?sig, "command ended by exception");
182 }
183 _ => {}
184 };
185
186 action.outcome(Outcome::DoNothing);
187 return Ok(());
188 }
189 }
190
191 if !empty_event {
192 let event = NextEvent::shutdown("recompiling function");
193 ext_cache.send_event(event).await?;
194 }
195 let when_running = Outcome::both(Outcome::Stop, Outcome::Start);
196 action.outcome(Outcome::if_running(when_running, Outcome::Start));
197
198 Ok::<(), ServerError>(())
199 }
200 });
201
202 config.on_pre_spawn(move |prespawn: PreSpawn| {
203 let name = wc.name.clone();
204 let runtime_api = wc.runtime_api.clone();
205 let manifest_path = wc.manifest_path.clone();
206 let bin_name = wc.bin_name.clone();
207 let base_env = wc.env.clone();
208 let instance_id = wc.instance_id;
209
210 async move {
211 trace!("loading watch environment metadata");
212
213 let new_env = reload_env(&manifest_path, &bin_name);
214
215 if let Some(mut command) = prespawn.command().await {
216 command
217 .env("AWS_LAMBDA_FUNCTION_VERSION", "1")
218 .env("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "4096")
219 .envs(base_env)
220 .envs(new_env)
221 .env("AWS_LAMBDA_RUNTIME_API", &runtime_api)
222 .env("AWS_LAMBDA_FUNCTION_NAME", &name);
223
224 command.env("AWS_LAMBDA_MAX_CONCURRENCY", "1");
225
226 if let Some(id) = instance_id {
227 command.env("AWS_LAMBDA_INSTANCE_ID", id.to_string());
228 }
229 }
230
231 Ok::<(), Infallible>(())
232 }
233 });
234
235 Ok(config)
236}
237
238fn function_graceful_shutdown_or_else_sigkill(
239 action: Action,
240 signal_type: MainSignal,
241 max_delay: Duration,
242) {
243 tracing::debug!(
244 ?signal_type,
245 ?max_delay,
246 "attempting graceful function shutdown"
247 );
248 action.outcome(Outcome::both(
249 Outcome::Signal(signal_type),
251 Outcome::race(
253 Outcome::both(Outcome::Wait, Outcome::Exit),
255 Outcome::both(
257 Outcome::Sleep(max_delay),
258 Outcome::both(Outcome::Stop, Outcome::Exit),
259 ),
260 ),
261 ));
262}
263
264fn reload_env(manifest_path: &PathBuf, bin_name: &Option<String>) -> HashMap<String, String> {
265 let metadata = match load_metadata(manifest_path, None) {
266 Ok(metadata) => metadata,
267 Err(e) => {
268 error!("failed to reload metadata: {}", e);
269 return HashMap::new();
270 }
271 };
272
273 let options = ConfigOptions {
274 names: FunctionNames::new(None, bin_name.clone()),
275 ..Default::default()
276 };
277 let config = match load_config_without_cli_flags(&metadata, &options) {
278 Ok(config) => config,
279 Err(e) => {
280 error!("failed to reload config: {}", e);
281 return HashMap::new();
282 }
283 };
284
285 match config.watch.lambda_environment(&config.env) {
286 Ok(env) => env,
287 Err(e) => {
288 error!("failed to reload environment: {}", e);
289 HashMap::new()
290 }
291 }
292}