runpod_sdk/serverless/client.rs
1//! Serverless endpoint runner for running serverless jobs
2
3use std::sync::Arc;
4
5use serde::Serialize;
6
7use super::job::ServerlessJob;
8use super::types::EndpointHealth;
9use crate::{Result, RunpodClient};
10
11#[cfg(feature = "tracing")]
12const TRACING_TARGET: &str = "runpod_sdk::serverless";
13
14/// Class for running jobs on a specific endpoint.
15///
16/// # Examples
17///
18/// ```no_run
19/// use runpod_sdk::{RunpodClient, Result};
20/// use runpod_sdk::serverless::ServerlessEndpoint;
21/// use serde_json::json;
22///
23/// # async fn example() -> Result<()> {
24/// let client = RunpodClient::from_env()?;
25/// let endpoint = ServerlessEndpoint::new("YOUR_ENDPOINT_ID", client);
26///
27/// let job = endpoint.run(&json!({"prompt": "Hello, world!"}))?;
28/// let output: serde_json::Value = job.await?;
29/// # Ok(())
30/// # }
31/// ```
32#[derive(Clone)]
33pub struct ServerlessEndpoint {
34 endpoint_id: Arc<String>,
35 client: RunpodClient,
36}
37
38impl ServerlessEndpoint {
39 /// Creates a new Endpoint instance.
40 ///
41 /// # Arguments
42 ///
43 /// * `endpoint_id` - The unique identifier for the serverless endpoint
44 /// * `client` - Reference to the RunpodClient
45 ///
46 /// # Example
47 ///
48 /// ```no_run
49 /// # use runpod_sdk::{RunpodClient, Result};
50 /// # use runpod_sdk::serverless::ServerlessEndpoint;
51 /// # fn example() -> Result<()> {
52 /// let client = RunpodClient::from_env()?;
53 /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
54 /// # Ok(())
55 /// # }
56 /// ```
57 pub fn new(endpoint_id: impl Into<String>, client: RunpodClient) -> Self {
58 Self {
59 endpoint_id: Arc::new(endpoint_id.into()),
60 client,
61 }
62 }
63
64 /// Returns the endpoint ID.
65 pub fn endpoint_id(&self) -> &str {
66 &self.endpoint_id
67 }
68
69 /// Runs a job on the endpoint.
70 ///
71 /// # Arguments
72 ///
73 /// * `input` - The input payload for the job
74 ///
75 /// # Returns
76 ///
77 /// Returns a Job instance that implements Future for retrieving results.
78 /// The job submission happens when you first poll the Job (e.g., by awaiting it).
79 ///
80 /// # Example
81 ///
82 /// ```no_run
83 /// # use runpod_sdk::{RunpodClient, Result};
84 /// # use runpod_sdk::serverless::ServerlessEndpoint;
85 /// # use serde::{Deserialize, Serialize};
86 /// # use serde_json::json;
87 /// #
88 /// # #[derive(Serialize)]
89 /// # struct Input {
90 /// # prompt: String,
91 /// # }
92 /// #
93 /// # async fn example() -> Result<()> {
94 /// let client = RunpodClient::from_env()?;
95 /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
96 ///
97 /// let job = endpoint.run(&Input {
98 /// prompt: "Hello, World!".to_string()
99 /// })?;
100 ///
101 /// let output: serde_json::Value = job.await?;
102 /// println!("Job result: {:?}", output);
103 /// # Ok(())
104 /// # }
105 /// ```
106 pub fn run<I>(&self, input: &I) -> Result<ServerlessJob>
107 where
108 I: Serialize,
109 {
110 let input_value = serde_json::to_value(input)?;
111
112 Ok(ServerlessJob::new(
113 Arc::clone(&self.endpoint_id),
114 input_value,
115 self.client.clone(),
116 ))
117 }
118
119 /// Runs a job and immediately waits for the result.
120 ///
121 /// This is a convenience method that runs a job and awaits its completion.
122 ///
123 /// # Example
124 ///
125 /// ```no_run
126 /// # use runpod_sdk::{RunpodClient, Result};
127 /// # use runpod_sdk::serverless::ServerlessEndpoint;
128 /// # use serde_json::json;
129 /// # async fn example() -> Result<()> {
130 /// let client = RunpodClient::from_env()?;
131 /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
132 ///
133 /// let output: serde_json::Value = endpoint.run_now(&json!({"prompt": "Hello"})).await?;
134 /// println!("Result: {:?}", output);
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub async fn run_now<I, O>(&self, input: &I) -> Result<O>
139 where
140 I: Serialize,
141 O: serde::de::DeserializeOwned,
142 {
143 let job = self.run(input)?;
144 let value = job.await?;
145 Ok(serde_json::from_value(value)?)
146 }
147
148 /// Checks the health of the endpoint.
149 ///
150 /// # Example
151 ///
152 /// ```no_run
153 /// # use runpod_sdk::{RunpodClient, Result};
154 /// # use runpod_sdk::serverless::ServerlessEndpoint;
155 /// # async fn example() -> Result<()> {
156 /// let client = RunpodClient::from_env()?;
157 /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
158 ///
159 /// let health = endpoint.health().await?;
160 /// println!("Workers ready: {}", health.workers.ready);
161 /// println!("Jobs in queue: {}", health.jobs.in_queue);
162 /// # Ok(())
163 /// # }
164 /// ```
165 pub async fn health(&self) -> Result<EndpointHealth> {
166 #[cfg(feature = "tracing")]
167 tracing::debug!(
168 target: TRACING_TARGET,
169 endpoint_id = %self.endpoint_id,
170 "Checking endpoint health"
171 );
172
173 let path = format!("{}/health", self.endpoint_id);
174
175 let response = self.client.get_api(&path).send().await?;
176 let response = response.error_for_status()?;
177 let health: EndpointHealth = response.json().await?;
178
179 #[cfg(feature = "tracing")]
180 tracing::debug!(
181 target: TRACING_TARGET,
182 endpoint_id = %self.endpoint_id,
183 workers_ready = health.workers.ready,
184 jobs_in_queue = health.jobs.in_queue,
185 "Endpoint health retrieved"
186 );
187
188 Ok(health)
189 }
190
191 /// Purges all jobs from the endpoint's queue.
192 ///
193 /// # Example
194 ///
195 /// ```no_run
196 /// # use runpod_sdk::{RunpodClient, Result};
197 /// # use runpod_sdk::serverless::ServerlessEndpoint;
198 /// # async fn example() -> Result<()> {
199 /// let client = RunpodClient::from_env()?;
200 /// let endpoint = ServerlessEndpoint::new("ENDPOINT_ID", client);
201 ///
202 /// endpoint.purge_queue().await?;
203 /// println!("Queue purged");
204 /// # Ok(())
205 /// # }
206 /// ```
207 pub async fn purge_queue(&self) -> Result<()> {
208 #[cfg(feature = "tracing")]
209 tracing::debug!(
210 target: TRACING_TARGET,
211 endpoint_id = %self.endpoint_id,
212 "Purging endpoint queue"
213 );
214
215 let path = format!("{}/purge-queue", self.endpoint_id);
216
217 let response = self.client.post_api(&path).send().await?;
218 response.error_for_status()?;
219
220 #[cfg(feature = "tracing")]
221 tracing::info!(
222 target: TRACING_TARGET,
223 endpoint_id = %self.endpoint_id,
224 "Endpoint queue purged successfully"
225 );
226
227 Ok(())
228 }
229}
230
231impl std::fmt::Debug for ServerlessEndpoint {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 f.debug_struct("Endpoint")
234 .field("endpoint_id", &self.endpoint_id)
235 .finish()
236 }
237}