1use serde::{de::DeserializeOwned, Serialize};
10use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
11use tokio::sync::{Mutex, RwLock};
12#[cfg(feature = "grpc")]
13use tokio_stream::StreamExt as _;
14use tracing::{debug, error, info, warn};
15
16use crate::error::{ErrorData, Result};
17use crate::wait_until::WaitUntilContext;
18use crate::{BindingsMode, BindingsProvider, BindingsProviderApi, WaitUntil};
19use alien_error::{AlienError, Context, IntoAlienError};
20
21#[cfg(feature = "grpc")]
22use crate::grpc::control_service::alien_bindings::control::{
23 control_service_client::ControlServiceClient, send_task_result_request::Result as TaskResult,
24 task::Payload as TaskPayload, RegisterEventHandlerRequest, RegisterHttpServerRequest,
25 SendTaskResultRequest, Task, TaskError, TaskSuccess, WaitForTasksRequest,
26};
27
28#[derive(Debug, Clone)]
30pub struct StorageEvent {
31 pub key: String,
32 pub event_type: String,
33 pub bucket: String,
34 pub size: u64,
35 pub content_type: String,
36}
37
38#[derive(Debug, Clone)]
40pub struct CronEvent {
41 pub schedule_name: String,
42 pub scheduled_time: chrono::DateTime<chrono::Utc>,
43}
44
45#[derive(Debug, Clone)]
47pub struct QueueMessage {
48 pub id: String,
49 pub payload: Vec<u8>,
50 pub receipt_handle: String,
51 pub source: String,
52 pub attempt_count: u32,
53}
54
55type StorageHandler =
57 Box<dyn Fn(StorageEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
58type CronHandler =
59 Box<dyn Fn(CronEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
60type QueueHandler =
61 Box<dyn Fn(QueueMessage) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
62type CommandHandler =
63 Box<dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>> + Send + Sync>;
64
65struct Handlers {
67 storage: HashMap<String, StorageHandler>,
68 cron: HashMap<String, CronHandler>,
69 queue: HashMap<String, QueueHandler>,
70 command: HashMap<String, CommandHandler>,
71}
72
73impl Default for Handlers {
74 fn default() -> Self {
75 Self {
76 storage: HashMap::new(),
77 cron: HashMap::new(),
78 queue: HashMap::new(),
79 command: HashMap::new(),
80 }
81 }
82}
83
84pub struct AlienContext {
90 wait_until_context: Arc<WaitUntilContext>,
92 bindings_provider: Arc<dyn BindingsProviderApi>,
94 app_id: String,
96 env_vars: HashMap<String, String>,
98 handlers: Arc<RwLock<Handlers>>,
100 #[cfg(feature = "grpc")]
102 control_client: Arc<Mutex<Option<ControlServiceClient<tonic::transport::Channel>>>>,
103}
104
105impl AlienContext {
106 pub async fn from_env() -> Result<Self> {
109 Self::from_env_with_vars(&std::env::vars().collect()).await
110 }
111
112 pub async fn from_env_with_vars(env_vars: &HashMap<String, String>) -> Result<Self> {
115 let bindings_mode = crate::get_bindings_mode_from_env(env_vars)?;
117
118 let bindings_provider: Arc<dyn BindingsProviderApi> = match bindings_mode {
119 BindingsMode::Grpc => {
120 #[cfg(feature = "grpc")]
121 {
122 use crate::providers::grpc_provider::GrpcBindingsProvider;
123 Arc::new(GrpcBindingsProvider::new_with_env(env_vars.clone())?)
124 }
125 #[cfg(not(feature = "grpc"))]
126 {
127 return Err(AlienError::new(ErrorData::FeatureNotEnabled {
128 feature: "grpc".to_string(),
129 }));
130 }
131 }
132 BindingsMode::Direct => Arc::new(BindingsProvider::from_env(env_vars.clone()).await?),
133 };
134
135 let app_id = uuid::Uuid::new_v4().to_string();
136
137 let wait_until_context =
138 Arc::new(WaitUntilContext::from_env_with_vars(Some(app_id.clone()), env_vars).await?);
139
140 wait_until_context.start_drain_listener().await?;
142
143 Ok(Self {
144 wait_until_context,
145 bindings_provider,
146 app_id,
147 env_vars: env_vars.clone(),
148 handlers: Arc::new(RwLock::new(Handlers::default())),
149 #[cfg(feature = "grpc")]
150 control_client: Arc::new(Mutex::new(None)),
151 })
152 }
153
154 pub fn new(
157 wait_until_context: Arc<WaitUntilContext>,
158 bindings_provider: Arc<dyn BindingsProviderApi>,
159 ) -> Self {
160 Self {
161 app_id: wait_until_context.application_id().to_string(),
162 wait_until_context,
163 bindings_provider,
164 env_vars: std::env::vars().collect(),
165 handlers: Arc::new(RwLock::new(Handlers::default())),
166 #[cfg(feature = "grpc")]
167 control_client: Arc::new(Mutex::new(None)),
168 }
169 }
170
171 #[cfg(feature = "grpc")]
173 async fn get_control_client(&self) -> Result<ControlServiceClient<tonic::transport::Channel>> {
174 let mut client_guard = self.control_client.lock().await;
175
176 if let Some(client) = client_guard.as_ref() {
177 return Ok(client.clone());
178 }
179
180 let grpc_address = self
181 .env_vars
182 .get("ALIEN_BINDINGS_GRPC_ADDRESS")
183 .ok_or_else(|| {
184 AlienError::new(ErrorData::EnvironmentVariableMissing {
185 variable_name: "ALIEN_BINDINGS_GRPC_ADDRESS".to_string(),
186 })
187 })?;
188
189 let endpoint = format!("http://{}", grpc_address);
190 let channel = tonic::transport::Channel::from_shared(endpoint.clone())
191 .into_alien_error()
192 .context(ErrorData::GrpcConnectionFailed {
193 endpoint: endpoint.clone(),
194 reason: "Invalid gRPC endpoint format".to_string(),
195 })?
196 .connect()
197 .await
198 .into_alien_error()
199 .context(ErrorData::GrpcConnectionFailed {
200 endpoint,
201 reason: "Failed to connect to gRPC server".to_string(),
202 })?;
203
204 let client = ControlServiceClient::new(channel);
205 *client_guard = Some(client.clone());
206 Ok(client)
207 }
208
209 pub fn bindings(&self) -> &dyn BindingsProviderApi {
213 self.bindings_provider.as_ref()
214 }
215
216 pub fn get_bindings(&self) -> Arc<dyn BindingsProviderApi> {
218 Arc::clone(&self.bindings_provider)
219 }
220
221 pub fn on_storage_event<F, Fut>(&self, resource: &str, handler: F)
233 where
234 F: Fn(StorageEvent) -> Fut + Send + Sync + 'static,
235 Fut: Future<Output = Result<()>> + Send + 'static,
236 {
237 let resource = resource.to_string();
238 let handler = Box::new(move |event: StorageEvent| {
239 let fut = handler(event);
240 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
241 });
242
243 let handlers = self.handlers.clone();
244 let resource_clone = resource.clone();
245
246 tokio::spawn(async move {
248 let mut h = handlers.write().await;
249 h.storage.insert(resource_clone, handler);
250 });
251
252 info!(resource = %resource, "Registered storage event handler");
253 }
254
255 pub fn on_cron_event<F, Fut>(&self, schedule: &str, handler: F)
265 where
266 F: Fn(CronEvent) -> Fut + Send + Sync + 'static,
267 Fut: Future<Output = Result<()>> + Send + 'static,
268 {
269 let schedule = schedule.to_string();
270 let handler = Box::new(move |event: CronEvent| {
271 let fut = handler(event);
272 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
273 });
274
275 let handlers = self.handlers.clone();
276 let schedule_clone = schedule.clone();
277
278 tokio::spawn(async move {
279 let mut h = handlers.write().await;
280 h.cron.insert(schedule_clone, handler);
281 });
282
283 info!(schedule = %schedule, "Registered cron event handler");
284 }
285
286 pub fn on_queue_message<F, Fut>(&self, queue: &str, handler: F)
296 where
297 F: Fn(QueueMessage) -> Fut + Send + Sync + 'static,
298 Fut: Future<Output = Result<()>> + Send + 'static,
299 {
300 let queue = queue.to_string();
301 let handler = Box::new(move |message: QueueMessage| {
302 let fut = handler(message);
303 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
304 });
305
306 let handlers = self.handlers.clone();
307 let queue_clone = queue.clone();
308
309 tokio::spawn(async move {
310 let mut h = handlers.write().await;
311 h.queue.insert(queue_clone, handler);
312 });
313
314 info!(queue = %queue, "Registered queue message handler");
315 }
316
317 pub fn on_command<P, R, F, Fut>(&self, command: &str, handler: F)
327 where
328 P: DeserializeOwned + Send + 'static,
329 R: Serialize + Send + 'static,
330 F: Fn(P) -> Fut + Send + Sync + 'static,
331 Fut: Future<Output = Result<R>> + Send + 'static,
332 {
333 let command = command.to_string();
334 let handler = Box::new(move |params_bytes: Vec<u8>| {
335 let params: P = match serde_json::from_slice(¶ms_bytes) {
337 Ok(p) => p,
338 Err(e) => {
339 return Box::pin(async move {
340 Err(AlienError::new(ErrorData::DeserializationFailed {
341 message: format!("Failed to deserialize command params: {}", e),
342 type_name: std::any::type_name::<P>().to_string(),
343 }))
344 })
345 as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>;
346 }
347 };
348
349 let fut = handler(params);
350 Box::pin(async move {
351 let result = fut.await?;
352 serde_json::to_vec(&result).into_alien_error().context(
353 ErrorData::SerializationFailed {
354 message: "Failed to serialize command result".to_string(),
355 },
356 )
357 }) as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>
358 });
359
360 let handlers = self.handlers.clone();
361 let command_clone = command.clone();
362
363 tokio::spawn(async move {
364 let mut h = handlers.write().await;
365 h.command.insert(command_clone, handler);
366 });
367
368 info!(command = %command, "Registered command handler");
369 }
370
371 #[cfg(feature = "grpc")]
383 pub async fn register_http_server(&self, port: u16) -> Result<()> {
384 info!(port = port, "Registering HTTP server with runtime");
385
386 let mut client = self.get_control_client().await?;
387
388 let request = tonic::Request::new(RegisterHttpServerRequest { port: port as u32 });
389
390 client
391 .register_http_server(request)
392 .await
393 .into_alien_error()
394 .context(ErrorData::GrpcCallFailed {
395 service: "ControlService".to_string(),
396 method: "RegisterHttpServer".to_string(),
397 reason: "gRPC call failed".to_string(),
398 })?;
399
400 info!(port = port, "HTTP server registered successfully");
401 Ok(())
402 }
403
404 #[cfg(not(feature = "grpc"))]
405 pub async fn register_http_server(&self, _port: u16) -> Result<()> {
406 Err(AlienError::new(ErrorData::FeatureNotEnabled {
407 feature: "grpc".to_string(),
408 }))
409 }
410
411 #[cfg(feature = "grpc")]
425 pub async fn run(&self) -> Result<()> {
426 info!(app_id = %self.app_id, "Entering event loop");
427
428 self.register_handlers_with_runtime().await?;
430
431 let mut client = self.get_control_client().await?;
433
434 let request = tonic::Request::new(WaitForTasksRequest {
435 application_id: self.app_id.clone(),
436 });
437
438 let mut stream = client
439 .wait_for_tasks(request)
440 .await
441 .into_alien_error()
442 .context(ErrorData::GrpcCallFailed {
443 service: "ControlService".to_string(),
444 method: "WaitForTasks".to_string(),
445 reason: "Failed to start task stream".to_string(),
446 })?
447 .into_inner();
448
449 info!("Task stream established, waiting for tasks");
450
451 while let Some(task_result) = stream.next().await {
453 match task_result {
454 Ok(task) => {
455 if let Err(e) = self.handle_task(task).await {
456 error!(error = %e, "Error handling task");
457 }
458 }
459 Err(status) => {
460 if status.code() == tonic::Code::Cancelled {
461 info!("Task stream cancelled, shutting down");
462 break;
463 }
464 error!(error = %status, "Error receiving task from stream");
465 }
466 }
467 }
468
469 info!("Task loop ended");
470 Ok(())
471 }
472
473 #[cfg(not(feature = "grpc"))]
474 pub async fn run(&self) -> Result<()> {
475 Err(AlienError::new(ErrorData::FeatureNotEnabled {
476 feature: "grpc".to_string(),
477 }))
478 }
479
480 #[cfg(feature = "grpc")]
482 async fn register_handlers_with_runtime(&self) -> Result<()> {
483 let handlers = self.handlers.read().await;
484 let mut client = self.get_control_client().await?;
485
486 for resource in handlers.storage.keys() {
488 let request = tonic::Request::new(RegisterEventHandlerRequest {
489 handler_type: "storage".to_string(),
490 resource_name: resource.clone(),
491 });
492 client
493 .register_event_handler(request)
494 .await
495 .into_alien_error()
496 .context(ErrorData::GrpcCallFailed {
497 service: "ControlService".to_string(),
498 method: "RegisterEventHandler".to_string(),
499 reason: "Failed to register storage handler".to_string(),
500 })?;
501 debug!(handler_type = "storage", resource = %resource, "Registered handler with runtime");
502 }
503
504 for schedule in handlers.cron.keys() {
506 let request = tonic::Request::new(RegisterEventHandlerRequest {
507 handler_type: "cron".to_string(),
508 resource_name: schedule.clone(),
509 });
510 client
511 .register_event_handler(request)
512 .await
513 .into_alien_error()
514 .context(ErrorData::GrpcCallFailed {
515 service: "ControlService".to_string(),
516 method: "RegisterEventHandler".to_string(),
517 reason: "Failed to register cron handler".to_string(),
518 })?;
519 debug!(handler_type = "cron", resource = %schedule, "Registered handler with runtime");
520 }
521
522 for queue in handlers.queue.keys() {
524 let request = tonic::Request::new(RegisterEventHandlerRequest {
525 handler_type: "queue".to_string(),
526 resource_name: queue.clone(),
527 });
528 client
529 .register_event_handler(request)
530 .await
531 .into_alien_error()
532 .context(ErrorData::GrpcCallFailed {
533 service: "ControlService".to_string(),
534 method: "RegisterEventHandler".to_string(),
535 reason: "Failed to register queue handler".to_string(),
536 })?;
537 debug!(handler_type = "queue", resource = %queue, "Registered handler with runtime");
538 }
539
540 for command in handlers.command.keys() {
542 let request = tonic::Request::new(RegisterEventHandlerRequest {
543 handler_type: "command".to_string(),
544 resource_name: command.clone(),
545 });
546 client
547 .register_event_handler(request)
548 .await
549 .into_alien_error()
550 .context(ErrorData::GrpcCallFailed {
551 service: "ControlService".to_string(),
552 method: "RegisterEventHandler".to_string(),
553 reason: "Failed to register command handler".to_string(),
554 })?;
555 debug!(handler_type = "command", resource = %command, "Registered handler with runtime");
556 }
557
558 Ok(())
559 }
560
561 #[cfg(feature = "grpc")]
563 async fn handle_task(&self, task: Task) -> Result<()> {
564 let task_id = task.task_id.clone();
565 debug!(task_id = %task_id, "Handling task");
566
567 let is_command = matches!(&task.payload, Some(TaskPayload::ArcCommand(_)));
569
570 let result = match task.payload {
571 Some(TaskPayload::StorageEvent(se)) => {
572 self.handle_storage_event(
573 &se.bucket,
574 StorageEvent {
575 key: se.key,
576 event_type: se.event_type,
577 bucket: se.bucket.clone(),
578 size: se.size,
579 content_type: se.content_type,
580 },
581 )
582 .await
583 }
584 Some(TaskPayload::CronEvent(ce)) => {
585 let scheduled_time = ce
586 .scheduled_time
587 .map(|ts| {
588 chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
589 .unwrap_or_else(chrono::Utc::now)
590 })
591 .unwrap_or_else(chrono::Utc::now);
592
593 self.handle_cron_event(
594 &ce.schedule_name,
595 CronEvent {
596 schedule_name: ce.schedule_name.clone(),
597 scheduled_time,
598 },
599 )
600 .await
601 }
602 Some(TaskPayload::QueueMessage(qm)) => {
603 self.handle_queue_message(
604 &qm.source,
605 QueueMessage {
606 id: qm.id,
607 payload: qm.payload,
608 receipt_handle: qm.receipt_handle,
609 source: qm.source.clone(),
610 attempt_count: qm.attempt_count,
611 },
612 )
613 .await
614 }
615 Some(TaskPayload::ArcCommand(cmd)) => {
616 self.handle_command(&task_id, &cmd.command_name, cmd.params, &cmd.response_url)
617 .await
618 }
619 None => {
620 warn!(task_id = %task_id, "Received task with no payload");
621 Ok(())
622 }
623 };
624
625 if !is_command {
627 self.send_task_result(&task_id, result).await?;
628 }
629
630 Ok(())
631 }
632
633 async fn handle_storage_event(&self, bucket: &str, event: StorageEvent) -> Result<()> {
634 let handlers = self.handlers.read().await;
635 if let Some(handler) = handlers
637 .storage
638 .get(bucket)
639 .or_else(|| handlers.storage.get("*"))
640 {
641 handler(event).await
642 } else {
643 warn!(bucket = %bucket, "No handler registered for storage event");
644 Ok(())
645 }
646 }
647
648 async fn handle_cron_event(&self, schedule: &str, event: CronEvent) -> Result<()> {
649 let handlers = self.handlers.read().await;
650 if let Some(handler) = handlers
652 .cron
653 .get(schedule)
654 .or_else(|| handlers.cron.get("*"))
655 {
656 handler(event).await
657 } else {
658 warn!(schedule = %schedule, "No handler registered for cron event");
659 Ok(())
660 }
661 }
662
663 async fn handle_queue_message(&self, queue: &str, message: QueueMessage) -> Result<()> {
664 let handlers = self.handlers.read().await;
665 if let Some(handler) = handlers
667 .queue
668 .get(queue)
669 .or_else(|| handlers.queue.get("*"))
670 {
671 handler(message).await
672 } else {
673 warn!(queue = %queue, "No handler registered for queue message");
674 Ok(())
675 }
676 }
677
678 #[cfg(feature = "grpc")]
679 async fn handle_command(
680 &self,
681 event_id: &str,
682 command: &str,
683 params: Vec<u8>,
684 _response_url: &str,
685 ) -> Result<()> {
686 let handlers = self.handlers.read().await;
687
688 let result = if let Some(handler) = handlers.command.get(command) {
689 match handler(params).await {
690 Ok(response_data) => {
691 self.send_command_response(event_id, Ok(response_data))
693 .await
694 }
695 Err(e) => {
696 self.send_command_response(event_id, Err(e.to_string()))
698 .await
699 }
700 }
701 } else {
702 warn!(command = %command, "No handler registered for command");
703 self.send_command_response(
704 event_id,
705 Err(format!("No handler for command: {}", command)),
706 )
707 .await
708 };
709
710 result
711 }
712
713 #[cfg(feature = "grpc")]
714 async fn send_task_result(&self, task_id: &str, result: Result<()>) -> Result<()> {
715 let mut client = self.get_control_client().await?;
716
717 let task_result = match result {
718 Ok(()) => TaskResult::Success(TaskSuccess {
719 response_data: vec![],
720 }),
721 Err(e) => TaskResult::Error(TaskError {
722 code: "HANDLER_ERROR".to_string(),
723 message: e.to_string(),
724 }),
725 };
726
727 let request = tonic::Request::new(SendTaskResultRequest {
728 task_id: task_id.to_string(),
729 result: Some(task_result),
730 });
731
732 client
733 .send_task_result(request)
734 .await
735 .into_alien_error()
736 .context(ErrorData::GrpcCallFailed {
737 service: "ControlService".to_string(),
738 method: "SendTaskResult".to_string(),
739 reason: "Failed to send task result".to_string(),
740 })?;
741
742 Ok(())
743 }
744
745 #[cfg(feature = "grpc")]
746 async fn send_command_response(
747 &self,
748 task_id: &str,
749 result: std::result::Result<Vec<u8>, String>,
750 ) -> Result<()> {
751 let mut client = self.get_control_client().await?;
752
753 let task_result = match result {
754 Ok(data) => TaskResult::Success(TaskSuccess {
755 response_data: data,
756 }),
757 Err(e) => TaskResult::Error(TaskError {
758 code: "COMMAND_ERROR".to_string(),
759 message: e,
760 }),
761 };
762
763 let request = tonic::Request::new(SendTaskResultRequest {
764 task_id: task_id.to_string(),
765 result: Some(task_result),
766 });
767
768 client
769 .send_task_result(request)
770 .await
771 .into_alien_error()
772 .context(ErrorData::GrpcCallFailed {
773 service: "ControlService".to_string(),
774 method: "SendTaskResult".to_string(),
775 reason: "Failed to send command response".to_string(),
776 })?;
777
778 eprintln!("[ALIEN_CONTEXT] send_task_result succeeded");
779 Ok(())
780 }
781
782 pub fn wait_until<F, Fut>(&self, task_fn: F) -> Result<()>
787 where
788 F: FnOnce() -> Fut + Send + 'static,
789 Fut: std::future::Future<Output = ()> + Send + 'static,
790 {
791 self.wait_until_context.wait_until(task_fn)
792 }
793
794 pub fn application_id(&self) -> &str {
798 &self.app_id
799 }
800
801 pub async fn get_task_count(&self) -> Result<u32> {
803 self.wait_until_context.get_task_count().await
804 }
805
806 pub async fn get_current_function(&self) -> Result<Option<Arc<dyn crate::traits::Function>>> {
808 if let Some(current_function_name) =
809 self.env_vars.get("ALIEN_CURRENT_FUNCTION_BINDING_NAME")
810 {
811 Ok(Some(
812 self.bindings_provider
813 .load_function(current_function_name)
814 .await?,
815 ))
816 } else {
817 Ok(None)
818 }
819 }
820
821 pub async fn get_current_container(&self) -> Result<Option<Arc<dyn crate::traits::Container>>> {
835 if let Some(current_container_name) =
836 self.env_vars.get("ALIEN_CURRENT_CONTAINER_BINDING_NAME")
837 {
838 Ok(Some(
839 self.bindings_provider
840 .load_container(current_container_name)
841 .await?,
842 ))
843 } else {
844 Ok(None)
845 }
846 }
847}