1use serde::{de::DeserializeOwned, Serialize};
10use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
11use tokio::sync::{Mutex, RwLock};
12#[cfg(feature = "grpc")]
13use tokio_stream::StreamExt as _;
14use tracing::{debug, error, info, warn};
15
16use crate::error::{ErrorData, Result};
17use crate::wait_until::WaitUntilContext;
18use crate::{BindingsMode, BindingsProvider, BindingsProviderApi, WaitUntil};
19use alien_core::{ENV_ALIEN_CURRENT_CONTAINER_BINDING_NAME, ENV_ALIEN_CURRENT_WORKER_BINDING_NAME};
20use alien_error::{AlienError, Context, IntoAlienError};
21
22#[cfg(feature = "grpc")]
23use crate::grpc::control_service::alien_bindings::control::{
24 control_service_client::ControlServiceClient, send_task_result_request::Result as TaskResult,
25 task::Payload as TaskPayload, RegisterEventHandlerRequest, RegisterHttpServerRequest,
26 SendTaskResultRequest, Task, TaskError, TaskSuccess, WaitForTasksRequest,
27};
28
29#[derive(Debug, Clone)]
31pub struct StorageEvent {
32 pub key: String,
33 pub event_type: String,
34 pub bucket: String,
35 pub size: u64,
36 pub content_type: String,
37}
38
39#[derive(Debug, Clone)]
41pub struct CronEvent {
42 pub schedule_name: String,
43 pub scheduled_time: chrono::DateTime<chrono::Utc>,
44}
45
46#[derive(Debug, Clone)]
48pub struct QueueMessage {
49 pub id: String,
50 pub payload: Vec<u8>,
51 pub receipt_handle: String,
52 pub source: String,
53 pub attempt_count: u32,
54}
55
56type StorageHandler =
58 Box<dyn Fn(StorageEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
59type CronHandler =
60 Box<dyn Fn(CronEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
61type QueueHandler =
62 Box<dyn Fn(QueueMessage) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
63type CommandHandler =
64 Box<dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>> + Send + Sync>;
65
66struct Handlers {
68 storage: HashMap<String, StorageHandler>,
69 cron: HashMap<String, CronHandler>,
70 queue: HashMap<String, QueueHandler>,
71 command: HashMap<String, CommandHandler>,
72}
73
74impl Default for Handlers {
75 fn default() -> Self {
76 Self {
77 storage: HashMap::new(),
78 cron: HashMap::new(),
79 queue: HashMap::new(),
80 command: HashMap::new(),
81 }
82 }
83}
84
85pub struct AlienContext {
91 wait_until_context: Arc<WaitUntilContext>,
93 bindings_provider: Arc<dyn BindingsProviderApi>,
95 app_id: String,
97 env_vars: HashMap<String, String>,
99 handlers: Arc<RwLock<Handlers>>,
101 #[cfg(feature = "grpc")]
103 control_client: Arc<Mutex<Option<ControlServiceClient<tonic::transport::Channel>>>>,
104}
105
106impl AlienContext {
107 pub async fn from_env() -> Result<Self> {
110 Self::from_env_with_vars(&std::env::vars().collect()).await
111 }
112
113 pub async fn from_env_with_vars(env_vars: &HashMap<String, String>) -> Result<Self> {
116 let bindings_mode = crate::get_bindings_mode_from_env(env_vars)?;
118
119 let bindings_provider: Arc<dyn BindingsProviderApi> = match bindings_mode {
120 BindingsMode::Grpc => {
121 #[cfg(feature = "grpc")]
122 {
123 use crate::providers::grpc_provider::GrpcBindingsProvider;
124 Arc::new(GrpcBindingsProvider::new_with_env(env_vars.clone())?)
125 }
126 #[cfg(not(feature = "grpc"))]
127 {
128 return Err(AlienError::new(ErrorData::FeatureNotEnabled {
129 feature: "grpc".to_string(),
130 }));
131 }
132 }
133 BindingsMode::Direct => Arc::new(BindingsProvider::from_env(env_vars.clone()).await?),
134 };
135
136 let app_id = uuid::Uuid::new_v4().to_string();
137
138 let wait_until_context =
139 Arc::new(WaitUntilContext::from_env_with_vars(Some(app_id.clone()), env_vars).await?);
140
141 wait_until_context.start_drain_listener().await?;
143
144 Ok(Self {
145 wait_until_context,
146 bindings_provider,
147 app_id,
148 env_vars: env_vars.clone(),
149 handlers: Arc::new(RwLock::new(Handlers::default())),
150 #[cfg(feature = "grpc")]
151 control_client: Arc::new(Mutex::new(None)),
152 })
153 }
154
155 pub fn new(
158 wait_until_context: Arc<WaitUntilContext>,
159 bindings_provider: Arc<dyn BindingsProviderApi>,
160 ) -> Self {
161 Self {
162 app_id: wait_until_context.application_id().to_string(),
163 wait_until_context,
164 bindings_provider,
165 env_vars: std::env::vars().collect(),
166 handlers: Arc::new(RwLock::new(Handlers::default())),
167 #[cfg(feature = "grpc")]
168 control_client: Arc::new(Mutex::new(None)),
169 }
170 }
171
172 #[cfg(feature = "grpc")]
174 async fn get_control_client(&self) -> Result<ControlServiceClient<tonic::transport::Channel>> {
175 let mut client_guard = self.control_client.lock().await;
176
177 if let Some(client) = client_guard.as_ref() {
178 return Ok(client.clone());
179 }
180
181 let grpc_address = self
182 .env_vars
183 .get("ALIEN_BINDINGS_GRPC_ADDRESS")
184 .ok_or_else(|| {
185 AlienError::new(ErrorData::EnvironmentVariableMissing {
186 variable_name: "ALIEN_BINDINGS_GRPC_ADDRESS".to_string(),
187 })
188 })?;
189
190 let endpoint = format!("http://{}", grpc_address);
191 let channel = tonic::transport::Channel::from_shared(endpoint.clone())
192 .into_alien_error()
193 .context(ErrorData::GrpcConnectionFailed {
194 endpoint: endpoint.clone(),
195 reason: "Invalid gRPC endpoint format".to_string(),
196 })?
197 .connect()
198 .await
199 .into_alien_error()
200 .context(ErrorData::GrpcConnectionFailed {
201 endpoint,
202 reason: "Failed to connect to gRPC server".to_string(),
203 })?;
204
205 let client = ControlServiceClient::new(channel);
206 *client_guard = Some(client.clone());
207 Ok(client)
208 }
209
210 pub fn bindings(&self) -> &dyn BindingsProviderApi {
214 self.bindings_provider.as_ref()
215 }
216
217 pub fn get_bindings(&self) -> Arc<dyn BindingsProviderApi> {
219 Arc::clone(&self.bindings_provider)
220 }
221
222 pub fn on_storage_event<F, Fut>(&self, resource: &str, handler: F)
234 where
235 F: Fn(StorageEvent) -> Fut + Send + Sync + 'static,
236 Fut: Future<Output = Result<()>> + Send + 'static,
237 {
238 let resource = resource.to_string();
239 let handler = Box::new(move |event: StorageEvent| {
240 let fut = handler(event);
241 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
242 });
243
244 let handlers = self.handlers.clone();
245 let resource_clone = resource.clone();
246
247 tokio::spawn(async move {
249 let mut h = handlers.write().await;
250 h.storage.insert(resource_clone, handler);
251 });
252
253 info!(resource = %resource, "Registered storage event handler");
254 }
255
256 pub fn on_cron_event<F, Fut>(&self, schedule: &str, handler: F)
266 where
267 F: Fn(CronEvent) -> Fut + Send + Sync + 'static,
268 Fut: Future<Output = Result<()>> + Send + 'static,
269 {
270 let schedule = schedule.to_string();
271 let handler = Box::new(move |event: CronEvent| {
272 let fut = handler(event);
273 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
274 });
275
276 let handlers = self.handlers.clone();
277 let schedule_clone = schedule.clone();
278
279 tokio::spawn(async move {
280 let mut h = handlers.write().await;
281 h.cron.insert(schedule_clone, handler);
282 });
283
284 info!(schedule = %schedule, "Registered cron event handler");
285 }
286
287 pub fn on_queue_message<F, Fut>(&self, queue: &str, handler: F)
297 where
298 F: Fn(QueueMessage) -> Fut + Send + Sync + 'static,
299 Fut: Future<Output = Result<()>> + Send + 'static,
300 {
301 let queue = queue.to_string();
302 let handler = Box::new(move |message: QueueMessage| {
303 let fut = handler(message);
304 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
305 });
306
307 let handlers = self.handlers.clone();
308 let queue_clone = queue.clone();
309
310 tokio::spawn(async move {
311 let mut h = handlers.write().await;
312 h.queue.insert(queue_clone, handler);
313 });
314
315 info!(queue = %queue, "Registered queue message handler");
316 }
317
318 pub fn on_command<P, R, F, Fut>(&self, command: &str, handler: F)
328 where
329 P: DeserializeOwned + Send + 'static,
330 R: Serialize + Send + 'static,
331 F: Fn(P) -> Fut + Send + Sync + 'static,
332 Fut: Future<Output = Result<R>> + Send + 'static,
333 {
334 let command = command.to_string();
335 let handler = Box::new(move |params_bytes: Vec<u8>| {
336 let params: P = match serde_json::from_slice(¶ms_bytes) {
338 Ok(p) => p,
339 Err(e) => {
340 return Box::pin(async move {
341 Err(AlienError::new(ErrorData::DeserializationFailed {
342 message: format!("Failed to deserialize command params: {}", e),
343 type_name: std::any::type_name::<P>().to_string(),
344 }))
345 })
346 as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>;
347 }
348 };
349
350 let fut = handler(params);
351 Box::pin(async move {
352 let result = fut.await?;
353 serde_json::to_vec(&result).into_alien_error().context(
354 ErrorData::SerializationFailed {
355 message: "Failed to serialize command result".to_string(),
356 },
357 )
358 }) as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>
359 });
360
361 let handlers = self.handlers.clone();
362 let command_clone = command.clone();
363
364 tokio::spawn(async move {
365 let mut h = handlers.write().await;
366 h.command.insert(command_clone, handler);
367 });
368
369 info!(command = %command, "Registered command handler");
370 }
371
372 #[cfg(feature = "grpc")]
384 pub async fn register_http_server(&self, port: u16) -> Result<()> {
385 info!(port = port, "Registering HTTP server with runtime");
386
387 let mut client = self.get_control_client().await?;
388
389 let request = tonic::Request::new(RegisterHttpServerRequest { port: port as u32 });
390
391 client
392 .register_http_server(request)
393 .await
394 .into_alien_error()
395 .context(ErrorData::GrpcCallFailed {
396 service: "ControlService".to_string(),
397 method: "RegisterHttpServer".to_string(),
398 reason: "gRPC call failed".to_string(),
399 })?;
400
401 info!(port = port, "HTTP server registered successfully");
402 Ok(())
403 }
404
405 #[cfg(not(feature = "grpc"))]
406 pub async fn register_http_server(&self, _port: u16) -> Result<()> {
407 Err(AlienError::new(ErrorData::FeatureNotEnabled {
408 feature: "grpc".to_string(),
409 }))
410 }
411
412 #[cfg(feature = "grpc")]
426 pub async fn run(&self) -> Result<()> {
427 info!(app_id = %self.app_id, "Entering event loop");
428
429 self.register_handlers_with_runtime().await?;
431
432 let mut client = self.get_control_client().await?;
434
435 let request = tonic::Request::new(WaitForTasksRequest {
436 application_id: self.app_id.clone(),
437 });
438
439 let mut stream = client
440 .wait_for_tasks(request)
441 .await
442 .into_alien_error()
443 .context(ErrorData::GrpcCallFailed {
444 service: "ControlService".to_string(),
445 method: "WaitForTasks".to_string(),
446 reason: "Failed to start task stream".to_string(),
447 })?
448 .into_inner();
449
450 info!("Task stream established, waiting for tasks");
451
452 while let Some(task_result) = stream.next().await {
454 match task_result {
455 Ok(task) => {
456 if let Err(e) = self.handle_task(task).await {
457 error!(error = %e, "Error handling task");
458 }
459 }
460 Err(status) => {
461 if status.code() == tonic::Code::Cancelled {
462 info!("Task stream cancelled, shutting down");
463 break;
464 }
465 error!(error = %status, "Error receiving task from stream");
466 }
467 }
468 }
469
470 info!("Task loop ended");
471 Ok(())
472 }
473
474 #[cfg(not(feature = "grpc"))]
475 pub async fn run(&self) -> Result<()> {
476 Err(AlienError::new(ErrorData::FeatureNotEnabled {
477 feature: "grpc".to_string(),
478 }))
479 }
480
481 #[cfg(feature = "grpc")]
483 async fn register_handlers_with_runtime(&self) -> Result<()> {
484 let handlers = self.handlers.read().await;
485 let mut client = self.get_control_client().await?;
486
487 for resource in handlers.storage.keys() {
489 let request = tonic::Request::new(RegisterEventHandlerRequest {
490 handler_type: "storage".to_string(),
491 resource_name: resource.clone(),
492 });
493 client
494 .register_event_handler(request)
495 .await
496 .into_alien_error()
497 .context(ErrorData::GrpcCallFailed {
498 service: "ControlService".to_string(),
499 method: "RegisterEventHandler".to_string(),
500 reason: "Failed to register storage handler".to_string(),
501 })?;
502 debug!(handler_type = "storage", resource = %resource, "Registered handler with runtime");
503 }
504
505 for schedule in handlers.cron.keys() {
507 let request = tonic::Request::new(RegisterEventHandlerRequest {
508 handler_type: "cron".to_string(),
509 resource_name: schedule.clone(),
510 });
511 client
512 .register_event_handler(request)
513 .await
514 .into_alien_error()
515 .context(ErrorData::GrpcCallFailed {
516 service: "ControlService".to_string(),
517 method: "RegisterEventHandler".to_string(),
518 reason: "Failed to register cron handler".to_string(),
519 })?;
520 debug!(handler_type = "cron", resource = %schedule, "Registered handler with runtime");
521 }
522
523 for queue in handlers.queue.keys() {
525 let request = tonic::Request::new(RegisterEventHandlerRequest {
526 handler_type: "queue".to_string(),
527 resource_name: queue.clone(),
528 });
529 client
530 .register_event_handler(request)
531 .await
532 .into_alien_error()
533 .context(ErrorData::GrpcCallFailed {
534 service: "ControlService".to_string(),
535 method: "RegisterEventHandler".to_string(),
536 reason: "Failed to register queue handler".to_string(),
537 })?;
538 debug!(handler_type = "queue", resource = %queue, "Registered handler with runtime");
539 }
540
541 for command in handlers.command.keys() {
543 let request = tonic::Request::new(RegisterEventHandlerRequest {
544 handler_type: "command".to_string(),
545 resource_name: command.clone(),
546 });
547 client
548 .register_event_handler(request)
549 .await
550 .into_alien_error()
551 .context(ErrorData::GrpcCallFailed {
552 service: "ControlService".to_string(),
553 method: "RegisterEventHandler".to_string(),
554 reason: "Failed to register command handler".to_string(),
555 })?;
556 debug!(handler_type = "command", resource = %command, "Registered handler with runtime");
557 }
558
559 Ok(())
560 }
561
562 #[cfg(feature = "grpc")]
564 async fn handle_task(&self, task: Task) -> Result<()> {
565 let task_id = task.task_id.clone();
566 debug!(task_id = %task_id, "Handling task");
567
568 let is_command = matches!(&task.payload, Some(TaskPayload::ArcCommand(_)));
570
571 let result = match task.payload {
572 Some(TaskPayload::StorageEvent(se)) => {
573 self.handle_storage_event(
574 &se.bucket,
575 StorageEvent {
576 key: se.key,
577 event_type: se.event_type,
578 bucket: se.bucket.clone(),
579 size: se.size,
580 content_type: se.content_type,
581 },
582 )
583 .await
584 }
585 Some(TaskPayload::CronEvent(ce)) => {
586 let scheduled_time = ce
587 .scheduled_time
588 .map(|ts| {
589 chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
590 .unwrap_or_else(chrono::Utc::now)
591 })
592 .unwrap_or_else(chrono::Utc::now);
593
594 self.handle_cron_event(
595 &ce.schedule_name,
596 CronEvent {
597 schedule_name: ce.schedule_name.clone(),
598 scheduled_time,
599 },
600 )
601 .await
602 }
603 Some(TaskPayload::QueueMessage(qm)) => {
604 self.handle_queue_message(
605 &qm.source,
606 QueueMessage {
607 id: qm.id,
608 payload: qm.payload,
609 receipt_handle: qm.receipt_handle,
610 source: qm.source.clone(),
611 attempt_count: qm.attempt_count,
612 },
613 )
614 .await
615 }
616 Some(TaskPayload::ArcCommand(cmd)) => {
617 self.handle_command(&task_id, &cmd.command_name, cmd.params, &cmd.response_url)
618 .await
619 }
620 None => {
621 warn!(task_id = %task_id, "Received task with no payload");
622 Ok(())
623 }
624 };
625
626 if !is_command {
628 self.send_task_result(&task_id, result).await?;
629 }
630
631 Ok(())
632 }
633
634 async fn handle_storage_event(&self, bucket: &str, event: StorageEvent) -> Result<()> {
635 let handlers = self.handlers.read().await;
636 if let Some(handler) = handlers
638 .storage
639 .get(bucket)
640 .or_else(|| handlers.storage.get("*"))
641 {
642 handler(event).await
643 } else {
644 warn!(bucket = %bucket, "No handler registered for storage event");
645 Ok(())
646 }
647 }
648
649 async fn handle_cron_event(&self, schedule: &str, event: CronEvent) -> Result<()> {
650 let handlers = self.handlers.read().await;
651 if let Some(handler) = handlers
653 .cron
654 .get(schedule)
655 .or_else(|| handlers.cron.get("*"))
656 {
657 handler(event).await
658 } else {
659 warn!(schedule = %schedule, "No handler registered for cron event");
660 Ok(())
661 }
662 }
663
664 async fn handle_queue_message(&self, queue: &str, message: QueueMessage) -> Result<()> {
665 let handlers = self.handlers.read().await;
666 if let Some(handler) = handlers
668 .queue
669 .get(queue)
670 .or_else(|| handlers.queue.get("*"))
671 {
672 handler(message).await
673 } else {
674 warn!(queue = %queue, "No handler registered for queue message");
675 Ok(())
676 }
677 }
678
679 #[cfg(feature = "grpc")]
680 async fn handle_command(
681 &self,
682 event_id: &str,
683 command: &str,
684 params: Vec<u8>,
685 _response_url: &str,
686 ) -> Result<()> {
687 let handlers = self.handlers.read().await;
688
689 let result = if let Some(handler) = handlers.command.get(command) {
690 match handler(params).await {
691 Ok(response_data) => {
692 self.send_command_response(event_id, Ok(response_data))
694 .await
695 }
696 Err(e) => {
697 self.send_command_response(event_id, Err(e.to_string()))
699 .await
700 }
701 }
702 } else {
703 warn!(command = %command, "No handler registered for command");
704 self.send_command_response(
705 event_id,
706 Err(format!("No handler for command: {}", command)),
707 )
708 .await
709 };
710
711 result
712 }
713
714 #[cfg(feature = "grpc")]
715 async fn send_task_result(&self, task_id: &str, result: Result<()>) -> Result<()> {
716 let mut client = self.get_control_client().await?;
717
718 let task_result = match result {
719 Ok(()) => TaskResult::Success(TaskSuccess {
720 response_data: vec![],
721 }),
722 Err(e) => TaskResult::Error(TaskError {
723 code: "HANDLER_ERROR".to_string(),
724 message: e.to_string(),
725 }),
726 };
727
728 let request = tonic::Request::new(SendTaskResultRequest {
729 task_id: task_id.to_string(),
730 result: Some(task_result),
731 });
732
733 client
734 .send_task_result(request)
735 .await
736 .into_alien_error()
737 .context(ErrorData::GrpcCallFailed {
738 service: "ControlService".to_string(),
739 method: "SendTaskResult".to_string(),
740 reason: "Failed to send task result".to_string(),
741 })?;
742
743 Ok(())
744 }
745
746 #[cfg(feature = "grpc")]
747 async fn send_command_response(
748 &self,
749 task_id: &str,
750 result: std::result::Result<Vec<u8>, String>,
751 ) -> Result<()> {
752 let mut client = self.get_control_client().await?;
753
754 let task_result = match result {
755 Ok(data) => TaskResult::Success(TaskSuccess {
756 response_data: data,
757 }),
758 Err(e) => TaskResult::Error(TaskError {
759 code: "COMMAND_ERROR".to_string(),
760 message: e,
761 }),
762 };
763
764 let request = tonic::Request::new(SendTaskResultRequest {
765 task_id: task_id.to_string(),
766 result: Some(task_result),
767 });
768
769 client
770 .send_task_result(request)
771 .await
772 .into_alien_error()
773 .context(ErrorData::GrpcCallFailed {
774 service: "ControlService".to_string(),
775 method: "SendTaskResult".to_string(),
776 reason: "Failed to send command response".to_string(),
777 })?;
778
779 eprintln!("[ALIEN_CONTEXT] send_task_result succeeded");
780 Ok(())
781 }
782
783 pub fn wait_until<F, Fut>(&self, task_fn: F) -> Result<()>
788 where
789 F: FnOnce() -> Fut + Send + 'static,
790 Fut: std::future::Future<Output = ()> + Send + 'static,
791 {
792 self.wait_until_context.wait_until(task_fn)
793 }
794
795 pub fn application_id(&self) -> &str {
799 &self.app_id
800 }
801
802 pub async fn get_task_count(&self) -> Result<u32> {
804 self.wait_until_context.get_task_count().await
805 }
806
807 pub async fn get_current_worker(&self) -> Result<Option<Arc<dyn crate::traits::Worker>>> {
809 if let Some(current_worker_name) = self.env_vars.get(ENV_ALIEN_CURRENT_WORKER_BINDING_NAME)
810 {
811 Ok(Some(
812 self.bindings_provider
813 .load_worker(current_worker_name)
814 .await?,
815 ))
816 } else {
817 Ok(None)
818 }
819 }
820
821 pub async fn get_current_container(&self) -> Result<Option<Arc<dyn crate::traits::Container>>> {
835 if let Some(current_container_name) =
836 self.env_vars.get(ENV_ALIEN_CURRENT_CONTAINER_BINDING_NAME)
837 {
838 Ok(Some(
839 self.bindings_provider
840 .load_container(current_container_name)
841 .await?,
842 ))
843 } else {
844 Ok(None)
845 }
846 }
847}