Skip to main content

alien_bindings/
alien_context.rs

1//! AlienContext - Main SDK entry point for Alien applications.
2//!
3//! Provides access to:
4//! - Resource bindings (storage, kv, queue, vault, etc.)
5//! - Event handlers (storage events, cron events, queue messages, commands)
6//! - Background tasks (wait_until)
7//! - HTTP server registration
8
9use serde::{de::DeserializeOwned, Serialize};
10use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
11use tokio::sync::{Mutex, RwLock};
12#[cfg(feature = "grpc")]
13use tokio_stream::StreamExt as _;
14use tracing::{debug, error, info, warn};
15
16use crate::error::{ErrorData, Result};
17use crate::wait_until::WaitUntilContext;
18use crate::{BindingsMode, BindingsProvider, BindingsProviderApi, WaitUntil};
19use alien_core::{ENV_ALIEN_CURRENT_CONTAINER_BINDING_NAME, ENV_ALIEN_CURRENT_WORKER_BINDING_NAME};
20use alien_error::{AlienError, Context, IntoAlienError};
21
22#[cfg(feature = "grpc")]
23use crate::grpc::control_service::alien_bindings::control::{
24    control_service_client::ControlServiceClient, send_task_result_request::Result as TaskResult,
25    task::Payload as TaskPayload, RegisterEventHandlerRequest, RegisterHttpServerRequest,
26    SendTaskResultRequest, Task, TaskError, TaskSuccess, WaitForTasksRequest,
27};
28
29/// Storage event delivered to handlers
30#[derive(Debug, Clone)]
31pub struct StorageEvent {
32    pub key: String,
33    pub event_type: String,
34    pub bucket: String,
35    pub size: u64,
36    pub content_type: String,
37}
38
39/// Cron event delivered to handlers  
40#[derive(Debug, Clone)]
41pub struct CronEvent {
42    pub schedule_name: String,
43    pub scheduled_time: chrono::DateTime<chrono::Utc>,
44}
45
46/// Queue message delivered to handlers
47#[derive(Debug, Clone)]
48pub struct QueueMessage {
49    pub id: String,
50    pub payload: Vec<u8>,
51    pub receipt_handle: String,
52    pub source: String,
53    pub attempt_count: u32,
54}
55
56/// Type alias for event handler functions
57type StorageHandler =
58    Box<dyn Fn(StorageEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
59type CronHandler =
60    Box<dyn Fn(CronEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
61type QueueHandler =
62    Box<dyn Fn(QueueMessage) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
63type CommandHandler =
64    Box<dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>> + Send + Sync>;
65
66/// Registered handlers
67struct Handlers {
68    storage: HashMap<String, StorageHandler>,
69    cron: HashMap<String, CronHandler>,
70    queue: HashMap<String, QueueHandler>,
71    command: HashMap<String, CommandHandler>,
72}
73
74impl Default for Handlers {
75    fn default() -> Self {
76        Self {
77            storage: HashMap::new(),
78            cron: HashMap::new(),
79            queue: HashMap::new(),
80            command: HashMap::new(),
81        }
82    }
83}
84
85/// Main context for Alien applications that provides access to:
86/// - Resource bindings (storage, kv, queue, vault, etc.)
87/// - Event handlers (storage events, cron events, queue messages, commands)
88/// - Background tasks (wait_until)
89/// - HTTP server registration
90pub struct AlienContext {
91    /// The wait_until context for managing background tasks
92    wait_until_context: Arc<WaitUntilContext>,
93    /// The bindings provider for accessing resources
94    bindings_provider: Arc<dyn BindingsProviderApi>,
95    /// Application ID
96    app_id: String,
97    /// Environment variables
98    env_vars: HashMap<String, String>,
99    /// Registered event handlers
100    handlers: Arc<RwLock<Handlers>>,
101    /// gRPC control client (lazy initialized)
102    #[cfg(feature = "grpc")]
103    control_client: Arc<Mutex<Option<ControlServiceClient<tonic::transport::Channel>>>>,
104}
105
106impl AlienContext {
107    /// Creates a new AlienContext from environment variables.
108    /// This automatically sets up gRPC communication and starts the drain listener.
109    pub async fn from_env() -> Result<Self> {
110        Self::from_env_with_vars(&std::env::vars().collect()).await
111    }
112
113    /// Creates a new AlienContext from provided environment variables.
114    /// This is useful for testing or when environment variables are not available via std::env.
115    pub async fn from_env_with_vars(env_vars: &HashMap<String, String>) -> Result<Self> {
116        // Choose the appropriate bindings provider based on ALIEN_BINDINGS_MODE
117        let bindings_mode = crate::get_bindings_mode_from_env(env_vars)?;
118
119        let bindings_provider: Arc<dyn BindingsProviderApi> = match bindings_mode {
120            BindingsMode::Grpc => {
121                #[cfg(feature = "grpc")]
122                {
123                    use crate::providers::grpc_provider::GrpcBindingsProvider;
124                    Arc::new(GrpcBindingsProvider::new_with_env(env_vars.clone())?)
125                }
126                #[cfg(not(feature = "grpc"))]
127                {
128                    return Err(AlienError::new(ErrorData::FeatureNotEnabled {
129                        feature: "grpc".to_string(),
130                    }));
131                }
132            }
133            BindingsMode::Direct => Arc::new(BindingsProvider::from_env(env_vars.clone()).await?),
134        };
135
136        let app_id = uuid::Uuid::new_v4().to_string();
137
138        let wait_until_context =
139            Arc::new(WaitUntilContext::from_env_with_vars(Some(app_id.clone()), env_vars).await?);
140
141        // Start the drain listener automatically
142        wait_until_context.start_drain_listener().await?;
143
144        Ok(Self {
145            wait_until_context,
146            bindings_provider,
147            app_id,
148            env_vars: env_vars.clone(),
149            handlers: Arc::new(RwLock::new(Handlers::default())),
150            #[cfg(feature = "grpc")]
151            control_client: Arc::new(Mutex::new(None)),
152        })
153    }
154
155    /// Creates a new AlienContext with custom provider and wait_until context.
156    /// This is mainly useful for testing or advanced use cases.
157    pub fn new(
158        wait_until_context: Arc<WaitUntilContext>,
159        bindings_provider: Arc<dyn BindingsProviderApi>,
160    ) -> Self {
161        Self {
162            app_id: wait_until_context.application_id().to_string(),
163            wait_until_context,
164            bindings_provider,
165            env_vars: std::env::vars().collect(),
166            handlers: Arc::new(RwLock::new(Handlers::default())),
167            #[cfg(feature = "grpc")]
168            control_client: Arc::new(Mutex::new(None)),
169        }
170    }
171
172    /// Gets the gRPC control client, creating it if needed
173    #[cfg(feature = "grpc")]
174    async fn get_control_client(&self) -> Result<ControlServiceClient<tonic::transport::Channel>> {
175        let mut client_guard = self.control_client.lock().await;
176
177        if let Some(client) = client_guard.as_ref() {
178            return Ok(client.clone());
179        }
180
181        let grpc_address = self
182            .env_vars
183            .get("ALIEN_BINDINGS_GRPC_ADDRESS")
184            .ok_or_else(|| {
185                AlienError::new(ErrorData::EnvironmentVariableMissing {
186                    variable_name: "ALIEN_BINDINGS_GRPC_ADDRESS".to_string(),
187                })
188            })?;
189
190        let endpoint = format!("http://{}", grpc_address);
191        let channel = tonic::transport::Channel::from_shared(endpoint.clone())
192            .into_alien_error()
193            .context(ErrorData::GrpcConnectionFailed {
194                endpoint: endpoint.clone(),
195                reason: "Invalid gRPC endpoint format".to_string(),
196            })?
197            .connect()
198            .await
199            .into_alien_error()
200            .context(ErrorData::GrpcConnectionFailed {
201                endpoint,
202                reason: "Failed to connect to gRPC server".to_string(),
203            })?;
204
205        let client = ControlServiceClient::new(channel);
206        *client_guard = Some(client.clone());
207        Ok(client)
208    }
209
210    // ==================== BINDINGS ====================
211
212    /// Gets the bindings provider for accessing storage, build, and other resources.
213    pub fn bindings(&self) -> &dyn BindingsProviderApi {
214        self.bindings_provider.as_ref()
215    }
216
217    /// Gets the bindings provider as an Arc
218    pub fn get_bindings(&self) -> Arc<dyn BindingsProviderApi> {
219        Arc::clone(&self.bindings_provider)
220    }
221
222    // ==================== EVENT HANDLERS ====================
223
224    /// Registers a handler for storage events on the specified bucket/resource.
225    ///
226    /// # Example
227    /// ```ignore
228    /// ctx.on_storage_event("uploads", |event| async move {
229    ///     println!("File {} was {}", event.key, event.event_type);
230    ///     Ok(())
231    /// });
232    /// ```
233    pub fn on_storage_event<F, Fut>(&self, resource: &str, handler: F)
234    where
235        F: Fn(StorageEvent) -> Fut + Send + Sync + 'static,
236        Fut: Future<Output = Result<()>> + Send + 'static,
237    {
238        let resource = resource.to_string();
239        let handler = Box::new(move |event: StorageEvent| {
240            let fut = handler(event);
241            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
242        });
243
244        let handlers = self.handlers.clone();
245        let resource_clone = resource.clone();
246
247        // Register in background to avoid blocking
248        tokio::spawn(async move {
249            let mut h = handlers.write().await;
250            h.storage.insert(resource_clone, handler);
251        });
252
253        info!(resource = %resource, "Registered storage event handler");
254    }
255
256    /// Registers a handler for cron/scheduled events.
257    ///
258    /// # Example
259    /// ```ignore
260    /// ctx.on_cron_event("daily-cleanup", |event| async move {
261    ///     cleanup_old_files().await;
262    ///     Ok(())
263    /// });
264    /// ```
265    pub fn on_cron_event<F, Fut>(&self, schedule: &str, handler: F)
266    where
267        F: Fn(CronEvent) -> Fut + Send + Sync + 'static,
268        Fut: Future<Output = Result<()>> + Send + 'static,
269    {
270        let schedule = schedule.to_string();
271        let handler = Box::new(move |event: CronEvent| {
272            let fut = handler(event);
273            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
274        });
275
276        let handlers = self.handlers.clone();
277        let schedule_clone = schedule.clone();
278
279        tokio::spawn(async move {
280            let mut h = handlers.write().await;
281            h.cron.insert(schedule_clone, handler);
282        });
283
284        info!(schedule = %schedule, "Registered cron event handler");
285    }
286
287    /// Registers a handler for queue messages.
288    ///
289    /// # Example
290    /// ```ignore
291    /// ctx.on_queue_message("tasks", |message| async move {
292    ///     process_task(&message.payload).await;
293    ///     Ok(())
294    /// });
295    /// ```
296    pub fn on_queue_message<F, Fut>(&self, queue: &str, handler: F)
297    where
298        F: Fn(QueueMessage) -> Fut + Send + Sync + 'static,
299        Fut: Future<Output = Result<()>> + Send + 'static,
300    {
301        let queue = queue.to_string();
302        let handler = Box::new(move |message: QueueMessage| {
303            let fut = handler(message);
304            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
305        });
306
307        let handlers = self.handlers.clone();
308        let queue_clone = queue.clone();
309
310        tokio::spawn(async move {
311            let mut h = handlers.write().await;
312            h.queue.insert(queue_clone, handler);
313        });
314
315        info!(queue = %queue, "Registered queue message handler");
316    }
317
318    /// Registers a command handler for remote command calls.
319    ///
320    /// # Example
321    /// ```ignore
322    /// ctx.on_command::<GenerateReportParams, ReportResult>("generate-report", |params| async move {
323    ///     let report = generate_report(params.start_date, params.end_date).await?;
324    ///     Ok(report)
325    /// });
326    /// ```
327    pub fn on_command<P, R, F, Fut>(&self, command: &str, handler: F)
328    where
329        P: DeserializeOwned + Send + 'static,
330        R: Serialize + Send + 'static,
331        F: Fn(P) -> Fut + Send + Sync + 'static,
332        Fut: Future<Output = Result<R>> + Send + 'static,
333    {
334        let command = command.to_string();
335        let handler = Box::new(move |params_bytes: Vec<u8>| {
336            // Deserialize params
337            let params: P = match serde_json::from_slice(&params_bytes) {
338                Ok(p) => p,
339                Err(e) => {
340                    return Box::pin(async move {
341                        Err(AlienError::new(ErrorData::DeserializationFailed {
342                            message: format!("Failed to deserialize command params: {}", e),
343                            type_name: std::any::type_name::<P>().to_string(),
344                        }))
345                    })
346                        as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>;
347                }
348            };
349
350            let fut = handler(params);
351            Box::pin(async move {
352                let result = fut.await?;
353                serde_json::to_vec(&result).into_alien_error().context(
354                    ErrorData::SerializationFailed {
355                        message: "Failed to serialize command result".to_string(),
356                    },
357                )
358            }) as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>
359        });
360
361        let handlers = self.handlers.clone();
362        let command_clone = command.clone();
363
364        tokio::spawn(async move {
365            let mut h = handlers.write().await;
366            h.command.insert(command_clone, handler);
367        });
368
369        info!(command = %command, "Registered command handler");
370    }
371
372    // ==================== HTTP SERVER ====================
373
374    /// Registers the application's HTTP server port with the runtime.
375    /// The runtime will forward HTTP requests to this port.
376    ///
377    /// # Example
378    /// ```ignore
379    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
380    /// let port = listener.local_addr()?.port();
381    /// ctx.register_http_server(port).await?;
382    /// ```
383    #[cfg(feature = "grpc")]
384    pub async fn register_http_server(&self, port: u16) -> Result<()> {
385        info!(port = port, "Registering HTTP server with runtime");
386
387        let mut client = self.get_control_client().await?;
388
389        let request = tonic::Request::new(RegisterHttpServerRequest { port: port as u32 });
390
391        client
392            .register_http_server(request)
393            .await
394            .into_alien_error()
395            .context(ErrorData::GrpcCallFailed {
396                service: "ControlService".to_string(),
397                method: "RegisterHttpServer".to_string(),
398                reason: "gRPC call failed".to_string(),
399            })?;
400
401        info!(port = port, "HTTP server registered successfully");
402        Ok(())
403    }
404
405    #[cfg(not(feature = "grpc"))]
406    pub async fn register_http_server(&self, _port: u16) -> Result<()> {
407        Err(AlienError::new(ErrorData::FeatureNotEnabled {
408            feature: "grpc".to_string(),
409        }))
410    }
411
412    // ==================== EVENT LOOP ====================
413
414    /// Enters the main event loop and processes events from the runtime.
415    /// This blocks until shutdown is signaled.
416    ///
417    /// Call this after registering all event handlers.
418    ///
419    /// # Example
420    /// ```ignore
421    /// ctx.on_storage_event("uploads", handler);
422    /// ctx.on_command("process", cmd_handler);
423    /// ctx.run().await?;
424    /// ```
425    #[cfg(feature = "grpc")]
426    pub async fn run(&self) -> Result<()> {
427        info!(app_id = %self.app_id, "Entering event loop");
428
429        // Register handlers with runtime
430        self.register_handlers_with_runtime().await?;
431
432        // Get control client and start task stream
433        let mut client = self.get_control_client().await?;
434
435        let request = tonic::Request::new(WaitForTasksRequest {
436            application_id: self.app_id.clone(),
437        });
438
439        let mut stream = client
440            .wait_for_tasks(request)
441            .await
442            .into_alien_error()
443            .context(ErrorData::GrpcCallFailed {
444                service: "ControlService".to_string(),
445                method: "WaitForTasks".to_string(),
446                reason: "Failed to start task stream".to_string(),
447            })?
448            .into_inner();
449
450        info!("Task stream established, waiting for tasks");
451
452        // Process tasks until stream ends
453        while let Some(task_result) = stream.next().await {
454            match task_result {
455                Ok(task) => {
456                    if let Err(e) = self.handle_task(task).await {
457                        error!(error = %e, "Error handling task");
458                    }
459                }
460                Err(status) => {
461                    if status.code() == tonic::Code::Cancelled {
462                        info!("Task stream cancelled, shutting down");
463                        break;
464                    }
465                    error!(error = %status, "Error receiving task from stream");
466                }
467            }
468        }
469
470        info!("Task loop ended");
471        Ok(())
472    }
473
474    #[cfg(not(feature = "grpc"))]
475    pub async fn run(&self) -> Result<()> {
476        Err(AlienError::new(ErrorData::FeatureNotEnabled {
477            feature: "grpc".to_string(),
478        }))
479    }
480
481    /// Register all handlers with the runtime
482    #[cfg(feature = "grpc")]
483    async fn register_handlers_with_runtime(&self) -> Result<()> {
484        let handlers = self.handlers.read().await;
485        let mut client = self.get_control_client().await?;
486
487        // Register storage handlers
488        for resource in handlers.storage.keys() {
489            let request = tonic::Request::new(RegisterEventHandlerRequest {
490                handler_type: "storage".to_string(),
491                resource_name: resource.clone(),
492            });
493            client
494                .register_event_handler(request)
495                .await
496                .into_alien_error()
497                .context(ErrorData::GrpcCallFailed {
498                    service: "ControlService".to_string(),
499                    method: "RegisterEventHandler".to_string(),
500                    reason: "Failed to register storage handler".to_string(),
501                })?;
502            debug!(handler_type = "storage", resource = %resource, "Registered handler with runtime");
503        }
504
505        // Register cron handlers
506        for schedule in handlers.cron.keys() {
507            let request = tonic::Request::new(RegisterEventHandlerRequest {
508                handler_type: "cron".to_string(),
509                resource_name: schedule.clone(),
510            });
511            client
512                .register_event_handler(request)
513                .await
514                .into_alien_error()
515                .context(ErrorData::GrpcCallFailed {
516                    service: "ControlService".to_string(),
517                    method: "RegisterEventHandler".to_string(),
518                    reason: "Failed to register cron handler".to_string(),
519                })?;
520            debug!(handler_type = "cron", resource = %schedule, "Registered handler with runtime");
521        }
522
523        // Register queue handlers
524        for queue in handlers.queue.keys() {
525            let request = tonic::Request::new(RegisterEventHandlerRequest {
526                handler_type: "queue".to_string(),
527                resource_name: queue.clone(),
528            });
529            client
530                .register_event_handler(request)
531                .await
532                .into_alien_error()
533                .context(ErrorData::GrpcCallFailed {
534                    service: "ControlService".to_string(),
535                    method: "RegisterEventHandler".to_string(),
536                    reason: "Failed to register queue handler".to_string(),
537                })?;
538            debug!(handler_type = "queue", resource = %queue, "Registered handler with runtime");
539        }
540
541        // Register command handlers
542        for command in handlers.command.keys() {
543            let request = tonic::Request::new(RegisterEventHandlerRequest {
544                handler_type: "command".to_string(),
545                resource_name: command.clone(),
546            });
547            client
548                .register_event_handler(request)
549                .await
550                .into_alien_error()
551                .context(ErrorData::GrpcCallFailed {
552                    service: "ControlService".to_string(),
553                    method: "RegisterEventHandler".to_string(),
554                    reason: "Failed to register command handler".to_string(),
555                })?;
556            debug!(handler_type = "command", resource = %command, "Registered handler with runtime");
557        }
558
559        Ok(())
560    }
561
562    /// Handle a single task from the runtime
563    #[cfg(feature = "grpc")]
564    async fn handle_task(&self, task: Task) -> Result<()> {
565        let task_id = task.task_id.clone();
566        debug!(task_id = %task_id, "Handling task");
567
568        // Check if it's a command before consuming the payload
569        let is_command = matches!(&task.payload, Some(TaskPayload::ArcCommand(_)));
570
571        let result = match task.payload {
572            Some(TaskPayload::StorageEvent(se)) => {
573                self.handle_storage_event(
574                    &se.bucket,
575                    StorageEvent {
576                        key: se.key,
577                        event_type: se.event_type,
578                        bucket: se.bucket.clone(),
579                        size: se.size,
580                        content_type: se.content_type,
581                    },
582                )
583                .await
584            }
585            Some(TaskPayload::CronEvent(ce)) => {
586                let scheduled_time = ce
587                    .scheduled_time
588                    .map(|ts| {
589                        chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
590                            .unwrap_or_else(chrono::Utc::now)
591                    })
592                    .unwrap_or_else(chrono::Utc::now);
593
594                self.handle_cron_event(
595                    &ce.schedule_name,
596                    CronEvent {
597                        schedule_name: ce.schedule_name.clone(),
598                        scheduled_time,
599                    },
600                )
601                .await
602            }
603            Some(TaskPayload::QueueMessage(qm)) => {
604                self.handle_queue_message(
605                    &qm.source,
606                    QueueMessage {
607                        id: qm.id,
608                        payload: qm.payload,
609                        receipt_handle: qm.receipt_handle,
610                        source: qm.source.clone(),
611                        attempt_count: qm.attempt_count,
612                    },
613                )
614                .await
615            }
616            Some(TaskPayload::ArcCommand(cmd)) => {
617                self.handle_command(&task_id, &cmd.command_name, cmd.params, &cmd.response_url)
618                    .await
619            }
620            None => {
621                warn!(task_id = %task_id, "Received task with no payload");
622                Ok(())
623            }
624        };
625
626        // For non-command tasks, send result to runtime
627        if !is_command {
628            self.send_task_result(&task_id, result).await?;
629        }
630
631        Ok(())
632    }
633
634    async fn handle_storage_event(&self, bucket: &str, event: StorageEvent) -> Result<()> {
635        let handlers = self.handlers.read().await;
636        // Try exact match first, then wildcard
637        if let Some(handler) = handlers
638            .storage
639            .get(bucket)
640            .or_else(|| handlers.storage.get("*"))
641        {
642            handler(event).await
643        } else {
644            warn!(bucket = %bucket, "No handler registered for storage event");
645            Ok(())
646        }
647    }
648
649    async fn handle_cron_event(&self, schedule: &str, event: CronEvent) -> Result<()> {
650        let handlers = self.handlers.read().await;
651        // Try exact match first, then wildcard
652        if let Some(handler) = handlers
653            .cron
654            .get(schedule)
655            .or_else(|| handlers.cron.get("*"))
656        {
657            handler(event).await
658        } else {
659            warn!(schedule = %schedule, "No handler registered for cron event");
660            Ok(())
661        }
662    }
663
664    async fn handle_queue_message(&self, queue: &str, message: QueueMessage) -> Result<()> {
665        let handlers = self.handlers.read().await;
666        // Try exact match first, then wildcard
667        if let Some(handler) = handlers
668            .queue
669            .get(queue)
670            .or_else(|| handlers.queue.get("*"))
671        {
672            handler(message).await
673        } else {
674            warn!(queue = %queue, "No handler registered for queue message");
675            Ok(())
676        }
677    }
678
679    #[cfg(feature = "grpc")]
680    async fn handle_command(
681        &self,
682        event_id: &str,
683        command: &str,
684        params: Vec<u8>,
685        _response_url: &str,
686    ) -> Result<()> {
687        let handlers = self.handlers.read().await;
688
689        let result = if let Some(handler) = handlers.command.get(command) {
690            match handler(params).await {
691                Ok(response_data) => {
692                    // Send success response
693                    self.send_command_response(event_id, Ok(response_data))
694                        .await
695                }
696                Err(e) => {
697                    // Send error response
698                    self.send_command_response(event_id, Err(e.to_string()))
699                        .await
700                }
701            }
702        } else {
703            warn!(command = %command, "No handler registered for command");
704            self.send_command_response(
705                event_id,
706                Err(format!("No handler for command: {}", command)),
707            )
708            .await
709        };
710
711        result
712    }
713
714    #[cfg(feature = "grpc")]
715    async fn send_task_result(&self, task_id: &str, result: Result<()>) -> Result<()> {
716        let mut client = self.get_control_client().await?;
717
718        let task_result = match result {
719            Ok(()) => TaskResult::Success(TaskSuccess {
720                response_data: vec![],
721            }),
722            Err(e) => TaskResult::Error(TaskError {
723                code: "HANDLER_ERROR".to_string(),
724                message: e.to_string(),
725            }),
726        };
727
728        let request = tonic::Request::new(SendTaskResultRequest {
729            task_id: task_id.to_string(),
730            result: Some(task_result),
731        });
732
733        client
734            .send_task_result(request)
735            .await
736            .into_alien_error()
737            .context(ErrorData::GrpcCallFailed {
738                service: "ControlService".to_string(),
739                method: "SendTaskResult".to_string(),
740                reason: "Failed to send task result".to_string(),
741            })?;
742
743        Ok(())
744    }
745
746    #[cfg(feature = "grpc")]
747    async fn send_command_response(
748        &self,
749        task_id: &str,
750        result: std::result::Result<Vec<u8>, String>,
751    ) -> Result<()> {
752        let mut client = self.get_control_client().await?;
753
754        let task_result = match result {
755            Ok(data) => TaskResult::Success(TaskSuccess {
756                response_data: data,
757            }),
758            Err(e) => TaskResult::Error(TaskError {
759                code: "COMMAND_ERROR".to_string(),
760                message: e,
761            }),
762        };
763
764        let request = tonic::Request::new(SendTaskResultRequest {
765            task_id: task_id.to_string(),
766            result: Some(task_result),
767        });
768
769        client
770            .send_task_result(request)
771            .await
772            .into_alien_error()
773            .context(ErrorData::GrpcCallFailed {
774                service: "ControlService".to_string(),
775                method: "SendTaskResult".to_string(),
776                reason: "Failed to send command response".to_string(),
777            })?;
778
779        eprintln!("[ALIEN_CONTEXT] send_task_result succeeded");
780        Ok(())
781    }
782
783    // ==================== WAIT UNTIL ====================
784
785    /// Registers a background task that will run even after the main handler returns.
786    /// The task runs in the application process and is tracked by the runtime for proper shutdown coordination.
787    pub fn wait_until<F, Fut>(&self, task_fn: F) -> Result<()>
788    where
789        F: FnOnce() -> Fut + Send + 'static,
790        Fut: std::future::Future<Output = ()> + Send + 'static,
791    {
792        self.wait_until_context.wait_until(task_fn)
793    }
794
795    // ==================== UTILITIES ====================
796
797    /// Gets the application ID for this context.
798    pub fn application_id(&self) -> &str {
799        &self.app_id
800    }
801
802    /// Gets the current number of registered wait_until tasks.
803    pub async fn get_task_count(&self) -> Result<u32> {
804        self.wait_until_context.get_task_count().await
805    }
806
807    /// Gets the current worker binding if available.
808    pub async fn get_current_worker(&self) -> Result<Option<Arc<dyn crate::traits::Worker>>> {
809        if let Some(current_worker_name) = self.env_vars.get(ENV_ALIEN_CURRENT_WORKER_BINDING_NAME)
810        {
811            Ok(Some(
812                self.bindings_provider
813                    .load_worker(current_worker_name)
814                    .await?,
815            ))
816        } else {
817            Ok(None)
818        }
819    }
820
821    /// Gets the current container binding if available.
822    ///
823    /// This allows a container to discover its own public and internal URLs.
824    /// Useful for constructing callback URLs, OAuth redirects, etc.
825    ///
826    /// # Example
827    /// ```ignore
828    /// let ctx = AlienContext::from_env().await?;
829    /// if let Some(container) = ctx.get_current_container().await? {
830    ///     let public_url = container.get_public_url();
831    ///     let callback_url = format!("{}/callback", public_url.unwrap_or(""));
832    /// }
833    /// ```
834    pub async fn get_current_container(&self) -> Result<Option<Arc<dyn crate::traits::Container>>> {
835        if let Some(current_container_name) =
836            self.env_vars.get(ENV_ALIEN_CURRENT_CONTAINER_BINDING_NAME)
837        {
838            Ok(Some(
839                self.bindings_provider
840                    .load_container(current_container_name)
841                    .await?,
842            ))
843        } else {
844            Ok(None)
845        }
846    }
847}