Skip to main content

alien_bindings/
alien_context.rs

1//! AlienContext - Main SDK entry point for Alien applications.
2//!
3//! Provides access to:
4//! - Resource bindings (storage, kv, queue, vault, etc.)
5//! - Event handlers (storage events, cron events, queue messages, commands)
6//! - Background tasks (wait_until)
7//! - HTTP server registration
8
9use serde::{de::DeserializeOwned, Serialize};
10use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
11use tokio::sync::{Mutex, RwLock};
12#[cfg(feature = "grpc")]
13use tokio_stream::StreamExt as _;
14use tracing::{debug, error, info, warn};
15
16use crate::error::{ErrorData, Result};
17use crate::wait_until::WaitUntilContext;
18use crate::{BindingsMode, BindingsProvider, BindingsProviderApi, WaitUntil};
19use alien_error::{AlienError, Context, IntoAlienError};
20
21#[cfg(feature = "grpc")]
22use crate::grpc::control_service::alien_bindings::control::{
23    control_service_client::ControlServiceClient, send_task_result_request::Result as TaskResult,
24    task::Payload as TaskPayload, RegisterEventHandlerRequest, RegisterHttpServerRequest,
25    SendTaskResultRequest, Task, TaskError, TaskSuccess, WaitForTasksRequest,
26};
27
28/// Storage event delivered to handlers
29#[derive(Debug, Clone)]
30pub struct StorageEvent {
31    pub key: String,
32    pub event_type: String,
33    pub bucket: String,
34    pub size: u64,
35    pub content_type: String,
36}
37
38/// Cron event delivered to handlers  
39#[derive(Debug, Clone)]
40pub struct CronEvent {
41    pub schedule_name: String,
42    pub scheduled_time: chrono::DateTime<chrono::Utc>,
43}
44
45/// Queue message delivered to handlers
46#[derive(Debug, Clone)]
47pub struct QueueMessage {
48    pub id: String,
49    pub payload: Vec<u8>,
50    pub receipt_handle: String,
51    pub source: String,
52    pub attempt_count: u32,
53}
54
55/// Type alias for event handler functions
56type StorageHandler =
57    Box<dyn Fn(StorageEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
58type CronHandler =
59    Box<dyn Fn(CronEvent) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
60type QueueHandler =
61    Box<dyn Fn(QueueMessage) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
62type CommandHandler =
63    Box<dyn Fn(Vec<u8>) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>> + Send + Sync>;
64
65/// Registered handlers
66struct Handlers {
67    storage: HashMap<String, StorageHandler>,
68    cron: HashMap<String, CronHandler>,
69    queue: HashMap<String, QueueHandler>,
70    command: HashMap<String, CommandHandler>,
71}
72
73impl Default for Handlers {
74    fn default() -> Self {
75        Self {
76            storage: HashMap::new(),
77            cron: HashMap::new(),
78            queue: HashMap::new(),
79            command: HashMap::new(),
80        }
81    }
82}
83
84/// Main context for Alien applications that provides access to:
85/// - Resource bindings (storage, kv, queue, vault, etc.)
86/// - Event handlers (storage events, cron events, queue messages, commands)
87/// - Background tasks (wait_until)
88/// - HTTP server registration
89pub struct AlienContext {
90    /// The wait_until context for managing background tasks
91    wait_until_context: Arc<WaitUntilContext>,
92    /// The bindings provider for accessing resources
93    bindings_provider: Arc<dyn BindingsProviderApi>,
94    /// Application ID
95    app_id: String,
96    /// Environment variables
97    env_vars: HashMap<String, String>,
98    /// Registered event handlers
99    handlers: Arc<RwLock<Handlers>>,
100    /// gRPC control client (lazy initialized)
101    #[cfg(feature = "grpc")]
102    control_client: Arc<Mutex<Option<ControlServiceClient<tonic::transport::Channel>>>>,
103}
104
105impl AlienContext {
106    /// Creates a new AlienContext from environment variables.
107    /// This automatically sets up gRPC communication and starts the drain listener.
108    pub async fn from_env() -> Result<Self> {
109        Self::from_env_with_vars(&std::env::vars().collect()).await
110    }
111
112    /// Creates a new AlienContext from provided environment variables.
113    /// This is useful for testing or when environment variables are not available via std::env.
114    pub async fn from_env_with_vars(env_vars: &HashMap<String, String>) -> Result<Self> {
115        // Choose the appropriate bindings provider based on ALIEN_BINDINGS_MODE
116        let bindings_mode = crate::get_bindings_mode_from_env(env_vars)?;
117
118        let bindings_provider: Arc<dyn BindingsProviderApi> = match bindings_mode {
119            BindingsMode::Grpc => {
120                #[cfg(feature = "grpc")]
121                {
122                    use crate::providers::grpc_provider::GrpcBindingsProvider;
123                    Arc::new(GrpcBindingsProvider::new_with_env(env_vars.clone())?)
124                }
125                #[cfg(not(feature = "grpc"))]
126                {
127                    return Err(AlienError::new(ErrorData::FeatureNotEnabled {
128                        feature: "grpc".to_string(),
129                    }));
130                }
131            }
132            BindingsMode::Direct => Arc::new(BindingsProvider::from_env(env_vars.clone()).await?),
133        };
134
135        let app_id = uuid::Uuid::new_v4().to_string();
136
137        let wait_until_context =
138            Arc::new(WaitUntilContext::from_env_with_vars(Some(app_id.clone()), env_vars).await?);
139
140        // Start the drain listener automatically
141        wait_until_context.start_drain_listener().await?;
142
143        Ok(Self {
144            wait_until_context,
145            bindings_provider,
146            app_id,
147            env_vars: env_vars.clone(),
148            handlers: Arc::new(RwLock::new(Handlers::default())),
149            #[cfg(feature = "grpc")]
150            control_client: Arc::new(Mutex::new(None)),
151        })
152    }
153
154    /// Creates a new AlienContext with custom provider and wait_until context.
155    /// This is mainly useful for testing or advanced use cases.
156    pub fn new(
157        wait_until_context: Arc<WaitUntilContext>,
158        bindings_provider: Arc<dyn BindingsProviderApi>,
159    ) -> Self {
160        Self {
161            app_id: wait_until_context.application_id().to_string(),
162            wait_until_context,
163            bindings_provider,
164            env_vars: std::env::vars().collect(),
165            handlers: Arc::new(RwLock::new(Handlers::default())),
166            #[cfg(feature = "grpc")]
167            control_client: Arc::new(Mutex::new(None)),
168        }
169    }
170
171    /// Gets the gRPC control client, creating it if needed
172    #[cfg(feature = "grpc")]
173    async fn get_control_client(&self) -> Result<ControlServiceClient<tonic::transport::Channel>> {
174        let mut client_guard = self.control_client.lock().await;
175
176        if let Some(client) = client_guard.as_ref() {
177            return Ok(client.clone());
178        }
179
180        let grpc_address = self
181            .env_vars
182            .get("ALIEN_BINDINGS_GRPC_ADDRESS")
183            .ok_or_else(|| {
184                AlienError::new(ErrorData::EnvironmentVariableMissing {
185                    variable_name: "ALIEN_BINDINGS_GRPC_ADDRESS".to_string(),
186                })
187            })?;
188
189        let endpoint = format!("http://{}", grpc_address);
190        let channel = tonic::transport::Channel::from_shared(endpoint.clone())
191            .into_alien_error()
192            .context(ErrorData::GrpcConnectionFailed {
193                endpoint: endpoint.clone(),
194                reason: "Invalid gRPC endpoint format".to_string(),
195            })?
196            .connect()
197            .await
198            .into_alien_error()
199            .context(ErrorData::GrpcConnectionFailed {
200                endpoint,
201                reason: "Failed to connect to gRPC server".to_string(),
202            })?;
203
204        let client = ControlServiceClient::new(channel);
205        *client_guard = Some(client.clone());
206        Ok(client)
207    }
208
209    // ==================== BINDINGS ====================
210
211    /// Gets the bindings provider for accessing storage, build, and other resources.
212    pub fn bindings(&self) -> &dyn BindingsProviderApi {
213        self.bindings_provider.as_ref()
214    }
215
216    /// Gets the bindings provider as an Arc
217    pub fn get_bindings(&self) -> Arc<dyn BindingsProviderApi> {
218        Arc::clone(&self.bindings_provider)
219    }
220
221    // ==================== EVENT HANDLERS ====================
222
223    /// Registers a handler for storage events on the specified bucket/resource.
224    ///
225    /// # Example
226    /// ```ignore
227    /// ctx.on_storage_event("uploads", |event| async move {
228    ///     println!("File {} was {}", event.key, event.event_type);
229    ///     Ok(())
230    /// });
231    /// ```
232    pub fn on_storage_event<F, Fut>(&self, resource: &str, handler: F)
233    where
234        F: Fn(StorageEvent) -> Fut + Send + Sync + 'static,
235        Fut: Future<Output = Result<()>> + Send + 'static,
236    {
237        let resource = resource.to_string();
238        let handler = Box::new(move |event: StorageEvent| {
239            let fut = handler(event);
240            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
241        });
242
243        let handlers = self.handlers.clone();
244        let resource_clone = resource.clone();
245
246        // Register in background to avoid blocking
247        tokio::spawn(async move {
248            let mut h = handlers.write().await;
249            h.storage.insert(resource_clone, handler);
250        });
251
252        info!(resource = %resource, "Registered storage event handler");
253    }
254
255    /// Registers a handler for cron/scheduled events.
256    ///
257    /// # Example
258    /// ```ignore
259    /// ctx.on_cron_event("daily-cleanup", |event| async move {
260    ///     cleanup_old_files().await;
261    ///     Ok(())
262    /// });
263    /// ```
264    pub fn on_cron_event<F, Fut>(&self, schedule: &str, handler: F)
265    where
266        F: Fn(CronEvent) -> Fut + Send + Sync + 'static,
267        Fut: Future<Output = Result<()>> + Send + 'static,
268    {
269        let schedule = schedule.to_string();
270        let handler = Box::new(move |event: CronEvent| {
271            let fut = handler(event);
272            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
273        });
274
275        let handlers = self.handlers.clone();
276        let schedule_clone = schedule.clone();
277
278        tokio::spawn(async move {
279            let mut h = handlers.write().await;
280            h.cron.insert(schedule_clone, handler);
281        });
282
283        info!(schedule = %schedule, "Registered cron event handler");
284    }
285
286    /// Registers a handler for queue messages.
287    ///
288    /// # Example
289    /// ```ignore
290    /// ctx.on_queue_message("tasks", |message| async move {
291    ///     process_task(&message.payload).await;
292    ///     Ok(())
293    /// });
294    /// ```
295    pub fn on_queue_message<F, Fut>(&self, queue: &str, handler: F)
296    where
297        F: Fn(QueueMessage) -> Fut + Send + Sync + 'static,
298        Fut: Future<Output = Result<()>> + Send + 'static,
299    {
300        let queue = queue.to_string();
301        let handler = Box::new(move |message: QueueMessage| {
302            let fut = handler(message);
303            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
304        });
305
306        let handlers = self.handlers.clone();
307        let queue_clone = queue.clone();
308
309        tokio::spawn(async move {
310            let mut h = handlers.write().await;
311            h.queue.insert(queue_clone, handler);
312        });
313
314        info!(queue = %queue, "Registered queue message handler");
315    }
316
317    /// Registers a command handler for ARC remote calls.
318    ///
319    /// # Example
320    /// ```ignore
321    /// ctx.on_command::<GenerateReportParams, ReportResult>("generate-report", |params| async move {
322    ///     let report = generate_report(params.start_date, params.end_date).await?;
323    ///     Ok(report)
324    /// });
325    /// ```
326    pub fn on_command<P, R, F, Fut>(&self, command: &str, handler: F)
327    where
328        P: DeserializeOwned + Send + 'static,
329        R: Serialize + Send + 'static,
330        F: Fn(P) -> Fut + Send + Sync + 'static,
331        Fut: Future<Output = Result<R>> + Send + 'static,
332    {
333        let command = command.to_string();
334        let handler = Box::new(move |params_bytes: Vec<u8>| {
335            // Deserialize params
336            let params: P = match serde_json::from_slice(&params_bytes) {
337                Ok(p) => p,
338                Err(e) => {
339                    return Box::pin(async move {
340                        Err(AlienError::new(ErrorData::DeserializationFailed {
341                            message: format!("Failed to deserialize command params: {}", e),
342                            type_name: std::any::type_name::<P>().to_string(),
343                        }))
344                    })
345                        as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>;
346                }
347            };
348
349            let fut = handler(params);
350            Box::pin(async move {
351                let result = fut.await?;
352                serde_json::to_vec(&result).into_alien_error().context(
353                    ErrorData::SerializationFailed {
354                        message: "Failed to serialize command result".to_string(),
355                    },
356                )
357            }) as Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send>>
358        });
359
360        let handlers = self.handlers.clone();
361        let command_clone = command.clone();
362
363        tokio::spawn(async move {
364            let mut h = handlers.write().await;
365            h.command.insert(command_clone, handler);
366        });
367
368        info!(command = %command, "Registered command handler");
369    }
370
371    // ==================== HTTP SERVER ====================
372
373    /// Registers the application's HTTP server port with the runtime.
374    /// The runtime will forward HTTP requests to this port.
375    ///
376    /// # Example
377    /// ```ignore
378    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
379    /// let port = listener.local_addr()?.port();
380    /// ctx.register_http_server(port).await?;
381    /// ```
382    #[cfg(feature = "grpc")]
383    pub async fn register_http_server(&self, port: u16) -> Result<()> {
384        info!(port = port, "Registering HTTP server with runtime");
385
386        let mut client = self.get_control_client().await?;
387
388        let request = tonic::Request::new(RegisterHttpServerRequest { port: port as u32 });
389
390        client
391            .register_http_server(request)
392            .await
393            .into_alien_error()
394            .context(ErrorData::GrpcCallFailed {
395                service: "ControlService".to_string(),
396                method: "RegisterHttpServer".to_string(),
397                reason: "gRPC call failed".to_string(),
398            })?;
399
400        info!(port = port, "HTTP server registered successfully");
401        Ok(())
402    }
403
404    #[cfg(not(feature = "grpc"))]
405    pub async fn register_http_server(&self, _port: u16) -> Result<()> {
406        Err(AlienError::new(ErrorData::FeatureNotEnabled {
407            feature: "grpc".to_string(),
408        }))
409    }
410
411    // ==================== EVENT LOOP ====================
412
413    /// Enters the main event loop and processes events from the runtime.
414    /// This blocks until shutdown is signaled.
415    ///
416    /// Call this after registering all event handlers.
417    ///
418    /// # Example
419    /// ```ignore
420    /// ctx.on_storage_event("uploads", handler);
421    /// ctx.on_command("process", cmd_handler);
422    /// ctx.run().await?;
423    /// ```
424    #[cfg(feature = "grpc")]
425    pub async fn run(&self) -> Result<()> {
426        info!(app_id = %self.app_id, "Entering event loop");
427
428        // Register handlers with runtime
429        self.register_handlers_with_runtime().await?;
430
431        // Get control client and start task stream
432        let mut client = self.get_control_client().await?;
433
434        let request = tonic::Request::new(WaitForTasksRequest {
435            application_id: self.app_id.clone(),
436        });
437
438        let mut stream = client
439            .wait_for_tasks(request)
440            .await
441            .into_alien_error()
442            .context(ErrorData::GrpcCallFailed {
443                service: "ControlService".to_string(),
444                method: "WaitForTasks".to_string(),
445                reason: "Failed to start task stream".to_string(),
446            })?
447            .into_inner();
448
449        info!("Task stream established, waiting for tasks");
450
451        // Process tasks until stream ends
452        while let Some(task_result) = stream.next().await {
453            match task_result {
454                Ok(task) => {
455                    if let Err(e) = self.handle_task(task).await {
456                        error!(error = %e, "Error handling task");
457                    }
458                }
459                Err(status) => {
460                    if status.code() == tonic::Code::Cancelled {
461                        info!("Task stream cancelled, shutting down");
462                        break;
463                    }
464                    error!(error = %status, "Error receiving task from stream");
465                }
466            }
467        }
468
469        info!("Task loop ended");
470        Ok(())
471    }
472
473    #[cfg(not(feature = "grpc"))]
474    pub async fn run(&self) -> Result<()> {
475        Err(AlienError::new(ErrorData::FeatureNotEnabled {
476            feature: "grpc".to_string(),
477        }))
478    }
479
480    /// Register all handlers with the runtime
481    #[cfg(feature = "grpc")]
482    async fn register_handlers_with_runtime(&self) -> Result<()> {
483        let handlers = self.handlers.read().await;
484        let mut client = self.get_control_client().await?;
485
486        // Register storage handlers
487        for resource in handlers.storage.keys() {
488            let request = tonic::Request::new(RegisterEventHandlerRequest {
489                handler_type: "storage".to_string(),
490                resource_name: resource.clone(),
491            });
492            client
493                .register_event_handler(request)
494                .await
495                .into_alien_error()
496                .context(ErrorData::GrpcCallFailed {
497                    service: "ControlService".to_string(),
498                    method: "RegisterEventHandler".to_string(),
499                    reason: "Failed to register storage handler".to_string(),
500                })?;
501            debug!(handler_type = "storage", resource = %resource, "Registered handler with runtime");
502        }
503
504        // Register cron handlers
505        for schedule in handlers.cron.keys() {
506            let request = tonic::Request::new(RegisterEventHandlerRequest {
507                handler_type: "cron".to_string(),
508                resource_name: schedule.clone(),
509            });
510            client
511                .register_event_handler(request)
512                .await
513                .into_alien_error()
514                .context(ErrorData::GrpcCallFailed {
515                    service: "ControlService".to_string(),
516                    method: "RegisterEventHandler".to_string(),
517                    reason: "Failed to register cron handler".to_string(),
518                })?;
519            debug!(handler_type = "cron", resource = %schedule, "Registered handler with runtime");
520        }
521
522        // Register queue handlers
523        for queue in handlers.queue.keys() {
524            let request = tonic::Request::new(RegisterEventHandlerRequest {
525                handler_type: "queue".to_string(),
526                resource_name: queue.clone(),
527            });
528            client
529                .register_event_handler(request)
530                .await
531                .into_alien_error()
532                .context(ErrorData::GrpcCallFailed {
533                    service: "ControlService".to_string(),
534                    method: "RegisterEventHandler".to_string(),
535                    reason: "Failed to register queue handler".to_string(),
536                })?;
537            debug!(handler_type = "queue", resource = %queue, "Registered handler with runtime");
538        }
539
540        // Register command handlers
541        for command in handlers.command.keys() {
542            let request = tonic::Request::new(RegisterEventHandlerRequest {
543                handler_type: "command".to_string(),
544                resource_name: command.clone(),
545            });
546            client
547                .register_event_handler(request)
548                .await
549                .into_alien_error()
550                .context(ErrorData::GrpcCallFailed {
551                    service: "ControlService".to_string(),
552                    method: "RegisterEventHandler".to_string(),
553                    reason: "Failed to register command handler".to_string(),
554                })?;
555            debug!(handler_type = "command", resource = %command, "Registered handler with runtime");
556        }
557
558        Ok(())
559    }
560
561    /// Handle a single task from the runtime
562    #[cfg(feature = "grpc")]
563    async fn handle_task(&self, task: Task) -> Result<()> {
564        let task_id = task.task_id.clone();
565        debug!(task_id = %task_id, "Handling task");
566
567        // Check if it's a command before consuming the payload
568        let is_command = matches!(&task.payload, Some(TaskPayload::ArcCommand(_)));
569
570        let result = match task.payload {
571            Some(TaskPayload::StorageEvent(se)) => {
572                self.handle_storage_event(
573                    &se.bucket,
574                    StorageEvent {
575                        key: se.key,
576                        event_type: se.event_type,
577                        bucket: se.bucket.clone(),
578                        size: se.size,
579                        content_type: se.content_type,
580                    },
581                )
582                .await
583            }
584            Some(TaskPayload::CronEvent(ce)) => {
585                let scheduled_time = ce
586                    .scheduled_time
587                    .map(|ts| {
588                        chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)
589                            .unwrap_or_else(chrono::Utc::now)
590                    })
591                    .unwrap_or_else(chrono::Utc::now);
592
593                self.handle_cron_event(
594                    &ce.schedule_name,
595                    CronEvent {
596                        schedule_name: ce.schedule_name.clone(),
597                        scheduled_time,
598                    },
599                )
600                .await
601            }
602            Some(TaskPayload::QueueMessage(qm)) => {
603                self.handle_queue_message(
604                    &qm.source,
605                    QueueMessage {
606                        id: qm.id,
607                        payload: qm.payload,
608                        receipt_handle: qm.receipt_handle,
609                        source: qm.source.clone(),
610                        attempt_count: qm.attempt_count,
611                    },
612                )
613                .await
614            }
615            Some(TaskPayload::ArcCommand(cmd)) => {
616                self.handle_command(&task_id, &cmd.command_name, cmd.params, &cmd.response_url)
617                    .await
618            }
619            None => {
620                warn!(task_id = %task_id, "Received task with no payload");
621                Ok(())
622            }
623        };
624
625        // For non-command tasks, send result to runtime
626        if !is_command {
627            self.send_task_result(&task_id, result).await?;
628        }
629
630        Ok(())
631    }
632
633    async fn handle_storage_event(&self, bucket: &str, event: StorageEvent) -> Result<()> {
634        let handlers = self.handlers.read().await;
635        // Try exact match first, then wildcard
636        if let Some(handler) = handlers
637            .storage
638            .get(bucket)
639            .or_else(|| handlers.storage.get("*"))
640        {
641            handler(event).await
642        } else {
643            warn!(bucket = %bucket, "No handler registered for storage event");
644            Ok(())
645        }
646    }
647
648    async fn handle_cron_event(&self, schedule: &str, event: CronEvent) -> Result<()> {
649        let handlers = self.handlers.read().await;
650        // Try exact match first, then wildcard
651        if let Some(handler) = handlers
652            .cron
653            .get(schedule)
654            .or_else(|| handlers.cron.get("*"))
655        {
656            handler(event).await
657        } else {
658            warn!(schedule = %schedule, "No handler registered for cron event");
659            Ok(())
660        }
661    }
662
663    async fn handle_queue_message(&self, queue: &str, message: QueueMessage) -> Result<()> {
664        let handlers = self.handlers.read().await;
665        // Try exact match first, then wildcard
666        if let Some(handler) = handlers
667            .queue
668            .get(queue)
669            .or_else(|| handlers.queue.get("*"))
670        {
671            handler(message).await
672        } else {
673            warn!(queue = %queue, "No handler registered for queue message");
674            Ok(())
675        }
676    }
677
678    #[cfg(feature = "grpc")]
679    async fn handle_command(
680        &self,
681        event_id: &str,
682        command: &str,
683        params: Vec<u8>,
684        _response_url: &str,
685    ) -> Result<()> {
686        let handlers = self.handlers.read().await;
687
688        let result = if let Some(handler) = handlers.command.get(command) {
689            match handler(params).await {
690                Ok(response_data) => {
691                    // Send success response
692                    self.send_command_response(event_id, Ok(response_data))
693                        .await
694                }
695                Err(e) => {
696                    // Send error response
697                    self.send_command_response(event_id, Err(e.to_string()))
698                        .await
699                }
700            }
701        } else {
702            warn!(command = %command, "No handler registered for command");
703            self.send_command_response(
704                event_id,
705                Err(format!("No handler for command: {}", command)),
706            )
707            .await
708        };
709
710        result
711    }
712
713    #[cfg(feature = "grpc")]
714    async fn send_task_result(&self, task_id: &str, result: Result<()>) -> Result<()> {
715        let mut client = self.get_control_client().await?;
716
717        let task_result = match result {
718            Ok(()) => TaskResult::Success(TaskSuccess {
719                response_data: vec![],
720            }),
721            Err(e) => TaskResult::Error(TaskError {
722                code: "HANDLER_ERROR".to_string(),
723                message: e.to_string(),
724            }),
725        };
726
727        let request = tonic::Request::new(SendTaskResultRequest {
728            task_id: task_id.to_string(),
729            result: Some(task_result),
730        });
731
732        client
733            .send_task_result(request)
734            .await
735            .into_alien_error()
736            .context(ErrorData::GrpcCallFailed {
737                service: "ControlService".to_string(),
738                method: "SendTaskResult".to_string(),
739                reason: "Failed to send task result".to_string(),
740            })?;
741
742        Ok(())
743    }
744
745    #[cfg(feature = "grpc")]
746    async fn send_command_response(
747        &self,
748        task_id: &str,
749        result: std::result::Result<Vec<u8>, String>,
750    ) -> Result<()> {
751        let mut client = self.get_control_client().await?;
752
753        let task_result = match result {
754            Ok(data) => TaskResult::Success(TaskSuccess {
755                response_data: data,
756            }),
757            Err(e) => TaskResult::Error(TaskError {
758                code: "COMMAND_ERROR".to_string(),
759                message: e,
760            }),
761        };
762
763        let request = tonic::Request::new(SendTaskResultRequest {
764            task_id: task_id.to_string(),
765            result: Some(task_result),
766        });
767
768        client
769            .send_task_result(request)
770            .await
771            .into_alien_error()
772            .context(ErrorData::GrpcCallFailed {
773                service: "ControlService".to_string(),
774                method: "SendTaskResult".to_string(),
775                reason: "Failed to send command response".to_string(),
776            })?;
777
778        eprintln!("[ALIEN_CONTEXT] send_task_result succeeded");
779        Ok(())
780    }
781
782    // ==================== WAIT UNTIL ====================
783
784    /// Registers a background task that will run even after the main handler returns.
785    /// The task runs in the application process and is tracked by the runtime for proper shutdown coordination.
786    pub fn wait_until<F, Fut>(&self, task_fn: F) -> Result<()>
787    where
788        F: FnOnce() -> Fut + Send + 'static,
789        Fut: std::future::Future<Output = ()> + Send + 'static,
790    {
791        self.wait_until_context.wait_until(task_fn)
792    }
793
794    // ==================== UTILITIES ====================
795
796    /// Gets the application ID for this context.
797    pub fn application_id(&self) -> &str {
798        &self.app_id
799    }
800
801    /// Gets the current number of registered wait_until tasks.
802    pub async fn get_task_count(&self) -> Result<u32> {
803        self.wait_until_context.get_task_count().await
804    }
805
806    /// Gets the current function binding if available.
807    pub async fn get_current_function(&self) -> Result<Option<Arc<dyn crate::traits::Function>>> {
808        if let Some(current_function_name) =
809            self.env_vars.get("ALIEN_CURRENT_FUNCTION_BINDING_NAME")
810        {
811            Ok(Some(
812                self.bindings_provider
813                    .load_function(current_function_name)
814                    .await?,
815            ))
816        } else {
817            Ok(None)
818        }
819    }
820
821    /// Gets the current container binding if available.
822    ///
823    /// This allows a container to discover its own public and internal URLs.
824    /// Useful for constructing callback URLs, OAuth redirects, etc.
825    ///
826    /// # Example
827    /// ```ignore
828    /// let ctx = AlienContext::from_env().await?;
829    /// if let Some(container) = ctx.get_current_container().await? {
830    ///     let public_url = container.get_public_url();
831    ///     let callback_url = format!("{}/callback", public_url.unwrap_or(""));
832    /// }
833    /// ```
834    pub async fn get_current_container(&self) -> Result<Option<Arc<dyn crate::traits::Container>>> {
835        if let Some(current_container_name) =
836            self.env_vars.get("ALIEN_CURRENT_CONTAINER_BINDING_NAME")
837        {
838            Ok(Some(
839                self.bindings_provider
840                    .load_container(current_container_name)
841                    .await?,
842            ))
843        } else {
844            Ok(None)
845        }
846    }
847}