runpod_sdk/serverless/
client.rs

1//! Serverless endpoint runner for running serverless jobs
2
3use std::sync::Arc;
4
5use serde::Serialize;
6
7use super::job::ServerlessJob;
8use super::types::EndpointHealth;
9use crate::{Result, RunpodClient};
10
11#[cfg(feature = "tracing")]
12const TRACING_TARGET: &str = "runpod_sdk::serverless";
13
14/// Class for running jobs on a specific endpoint.
15///
16/// # Examples
17///
18/// ```no_run
19/// use runpod_sdk::{RunpodClient, Result};
20/// use runpod_sdk::serverless::ServerlessEndpoint;
21/// use serde_json::json;
22///
23/// # async fn example() -> Result<()> {
24/// let client = RunpodClient::from_env()?;
25/// let endpoint = ServerlessEndpoint::new("YOUR_ENDPOINT_ID", client);
26///
27/// let job = endpoint.run(&json!({"prompt": "Hello, world!"}))?;
28/// let output: serde_json::Value = job.await?;
29/// # Ok(())
30/// # }
31/// ```
32#[derive(Clone)]
33pub struct ServerlessEndpoint {
34    endpoint_id: Arc<String>,
35    client: RunpodClient,
36}
37
38impl ServerlessEndpoint {
39    /// Creates a new Endpoint instance.
40    ///
41    /// # Arguments
42    ///
43    /// * `endpoint_id` - The unique identifier for the serverless endpoint
44    /// * `client` - Reference to the RunpodClient
45    ///
46    /// # Example
47    ///
48    /// ```no_run
49    /// # use runpod_sdk::{RunpodClient, Result};
50    /// # use runpod_sdk::serverless::ServerlessEndpoint;
51    /// # fn example() -> Result<()> {
52    /// let client = RunpodClient::from_env()?;
53    /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
54    /// # Ok(())
55    /// # }
56    /// ```
57    pub fn new(endpoint_id: impl Into<String>, client: RunpodClient) -> Self {
58        Self {
59            endpoint_id: Arc::new(endpoint_id.into()),
60            client,
61        }
62    }
63
64    /// Returns the endpoint ID.
65    pub fn endpoint_id(&self) -> &str {
66        &self.endpoint_id
67    }
68
69    /// Runs a job on the endpoint.
70    ///
71    /// # Arguments
72    ///
73    /// * `input` - The input payload for the job
74    ///
75    /// # Returns
76    ///
77    /// Returns a Job instance that implements Future for retrieving results.
78    /// The job submission happens when you first poll the Job (e.g., by awaiting it).
79    ///
80    /// # Example
81    ///
82    /// ```no_run
83    /// # use runpod_sdk::{RunpodClient, Result};
84    /// # use runpod_sdk::serverless::ServerlessEndpoint;
85    /// # use serde::{Deserialize, Serialize};
86    /// # use serde_json::json;
87    /// #
88    /// # #[derive(Serialize)]
89    /// # struct Input {
90    /// #     prompt: String,
91    /// # }
92    /// #
93    /// # async fn example() -> Result<()> {
94    /// let client = RunpodClient::from_env()?;
95    /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
96    ///
97    /// let job = endpoint.run(&Input {
98    ///     prompt: "Hello, World!".to_string()
99    /// })?;
100    ///
101    /// let output: serde_json::Value = job.await?;
102    /// println!("Job result: {:?}", output);
103    /// # Ok(())
104    /// # }
105    /// ```
106    pub fn run<I>(&self, input: &I) -> Result<ServerlessJob>
107    where
108        I: Serialize,
109    {
110        let input_value = serde_json::to_value(input)?;
111
112        Ok(ServerlessJob::new(
113            Arc::clone(&self.endpoint_id),
114            input_value,
115            self.client.clone(),
116        ))
117    }
118
119    /// Runs a job and immediately waits for the result.
120    ///
121    /// This is a convenience method that runs a job and awaits its completion.
122    ///
123    /// # Example
124    ///
125    /// ```no_run
126    /// # use runpod_sdk::{RunpodClient, Result};
127    /// # use runpod_sdk::serverless::ServerlessEndpoint;
128    /// # use serde_json::json;
129    /// # async fn example() -> Result<()> {
130    /// let client = RunpodClient::from_env()?;
131    /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
132    ///
133    /// let output: serde_json::Value = endpoint.run_now(&json!({"prompt": "Hello"})).await?;
134    /// println!("Result: {:?}", output);
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub async fn run_now<I, O>(&self, input: &I) -> Result<O>
139    where
140        I: Serialize,
141        O: serde::de::DeserializeOwned,
142    {
143        let job = self.run(input)?;
144        let value = job.await?;
145        Ok(serde_json::from_value(value)?)
146    }
147
148    /// Checks the health of the endpoint.
149    ///
150    /// # Example
151    ///
152    /// ```no_run
153    /// # use runpod_sdk::{RunpodClient, Result};
154    /// # use runpod_sdk::serverless::ServerlessEndpoint;
155    /// # async fn example() -> Result<()> {
156    /// let client = RunpodClient::from_env()?;
157    /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
158    ///
159    /// let health = endpoint.health().await?;
160    /// println!("Workers ready: {}", health.workers.ready);
161    /// println!("Jobs in queue: {}", health.jobs.in_queue);
162    /// # Ok(())
163    /// # }
164    /// ```
165    pub async fn health(&self) -> Result<EndpointHealth> {
166        #[cfg(feature = "tracing")]
167        tracing::debug!(
168            target: TRACING_TARGET,
169            endpoint_id = %self.endpoint_id,
170            "Checking endpoint health"
171        );
172
173        let path = format!("{}/health", self.endpoint_id);
174
175        let response = self.client.get_api(&path).send().await?;
176        let response = response.error_for_status()?;
177        let health: EndpointHealth = response.json().await?;
178
179        #[cfg(feature = "tracing")]
180        tracing::debug!(
181            target: TRACING_TARGET,
182            endpoint_id = %self.endpoint_id,
183            workers_ready = health.workers.ready,
184            jobs_in_queue = health.jobs.in_queue,
185            "Endpoint health retrieved"
186        );
187
188        Ok(health)
189    }
190
191    /// Purges all jobs from the endpoint's queue.
192    ///
193    /// # Example
194    ///
195    /// ```no_run
196    /// # use runpod_sdk::{RunpodClient, Result};
197    /// # use runpod_sdk::serverless::ServerlessEndpoint;
198    /// # async fn example() -> Result<()> {
199    /// let client = RunpodClient::from_env()?;
200    /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
201    ///
202    /// endpoint.purge_queue().await?;
203    /// println!("Queue purged");
204    /// # Ok(())
205    /// # }
206    /// ```
207    pub async fn purge_queue(&self) -> Result<()> {
208        #[cfg(feature = "tracing")]
209        tracing::debug!(
210            target: TRACING_TARGET,
211            endpoint_id = %self.endpoint_id,
212            "Purging endpoint queue"
213        );
214
215        let path = format!("{}/purge-queue", self.endpoint_id);
216
217        let response = self.client.post_api(&path).send().await?;
218        response.error_for_status()?;
219
220        #[cfg(feature = "tracing")]
221        tracing::info!(
222            target: TRACING_TARGET,
223            endpoint_id = %self.endpoint_id,
224            "Endpoint queue purged successfully"
225        );
226
227        Ok(())
228    }
229}
230
231impl std::fmt::Debug for ServerlessEndpoint {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        f.debug_struct("Endpoint")
234            .field("endpoint_id", &self.endpoint_id)
235            .finish()
236    }
237}