1mod error;
2mod types;
3
4use std::sync::Arc;
5use tokio::sync::Mutex;
6use eventsource_client::{Client, SSE, ClientBuilder};
7use futures::StreamExt;
8
9pub use error::{OpenPondError, Result};
10pub use types::*;
11
12#[derive(Clone)]
20pub struct OpenPondSDK {
21 client: reqwest::Client,
22 config: OpenPondConfig,
23 message_callback: Arc<Mutex<Option<Box<dyn Fn(Message) + Send + Sync>>>>,
24 error_callback: Arc<Mutex<Option<Box<dyn Fn(OpenPondError) + Send + Sync>>>>,
25}
26
27impl OpenPondSDK {
28 pub fn new(config: OpenPondConfig) -> Self {
30 let mut headers = reqwest::header::HeaderMap::new();
31 headers.insert(
32 reqwest::header::CONTENT_TYPE,
33 "application/json".parse().unwrap(),
34 );
35
36 if let Some(api_key) = &config.api_key {
37 headers.insert(
38 "X-API-Key",
39 api_key.parse().unwrap(),
40 );
41 }
42
43 let client = reqwest::Client::builder()
44 .default_headers(headers)
45 .build()
46 .unwrap();
47
48 Self {
49 client,
50 config,
51 message_callback: Arc::new(Mutex::new(None)),
52 error_callback: Arc::new(Mutex::new(None)),
53 }
54 }
55
56 pub async fn on_message<F>(&self, callback: F)
58 where
59 F: Fn(Message) + Send + Sync + 'static,
60 {
61 let mut cb = self.message_callback.lock().await;
62 *cb = Some(Box::new(callback));
63 }
64
65 pub async fn on_error<F>(&self, callback: F)
67 where
68 F: Fn(OpenPondError) + Send + Sync + 'static,
69 {
70 let mut cb = self.error_callback.lock().await;
71 *cb = Some(Box::new(callback));
72 }
73
74 pub async fn start(&self) -> Result<()> {
76 self.register_agent().await?;
78
79 let url = format!("{}/messages/stream", self.config.api_url);
81 let mut builder = ClientBuilder::for_url(&url).map_err(|_| OpenPondError::SSEError)?;
82
83 builder = builder
85 .header("Accept", "text/event-stream")
86 .map_err(|_| OpenPondError::SSEError)?;
87
88 if let Some(private_key) = &self.config.private_key {
90 let timestamp = chrono::Utc::now().timestamp_millis().to_string();
91 let message = format!("Authenticate to OpenPond API at timestamp {}", timestamp);
92
93 builder = builder
94 .header("X-Agent-Id", private_key)
95 .map_err(|_| OpenPondError::SSEError)?
96 .header("X-Timestamp", ×tamp)
97 .map_err(|_| OpenPondError::SSEError)?;
98
99 }
102
103 if let Some(api_key) = &self.config.api_key {
104 builder = builder
105 .header("X-API-Key", api_key)
106 .map_err(|_| OpenPondError::SSEError)?;
107 }
108
109 let message_callback = self.message_callback.clone();
111 let error_callback = self.error_callback.clone();
112 let agent_id = self.config.private_key.clone();
113
114 tokio::spawn(async move {
116 let client = builder.build();
117 let mut stream = client.stream();
118
119 while let Some(event) = stream.next().await {
120 match event {
121 Ok(SSE::Event(event)) => {
122 if let Ok(msg) = serde_json::from_str::<Message>(&event.data) {
123 if let Some(ref our_id) = agent_id {
125 if msg.to_agent_id == *our_id {
126 if let Some(cb) = message_callback.lock().await.as_ref() {
127 cb(msg);
128 }
129 }
130 }
131 }
132 }
133 Err(e) => {
134 if let Some(cb) = error_callback.lock().await.as_ref() {
135 cb(OpenPondError::from(e));
136 }
137 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
139 }
140 _ => {} }
142 }
143 });
144
145 Ok(())
146 }
147
148 pub async fn stop(&self) -> Result<()> {
150 Ok(())
152 }
153
154 pub async fn send_message(
156 &self,
157 to_agent_id: &str,
158 content: &str,
159 options: Option<SendMessageOptions>,
160 ) -> Result<String> {
161 let response = self.client
162 .post(&format!("{}/messages", self.config.api_url))
163 .json(&serde_json::json!({
164 "toAgentId": to_agent_id,
165 "content": content,
166 "privateKey": self.config.private_key,
167 "options": options,
168 }))
169 .send()
170 .await?;
171
172 if !response.status().is_success() {
173 return Err(OpenPondError::ApiError {
174 status: response.status().as_u16(),
175 message: response.text().await?,
176 });
177 }
178
179 let data: serde_json::Value = response.json().await?;
180 Ok(data["messageId"].as_str().unwrap_or_default().to_string())
181 }
182
183 pub async fn get_agent(&self, agent_id: &str) -> Result<Agent> {
185 let response = self.client
186 .get(&format!("{}/agents/{}", self.config.api_url, agent_id))
187 .send()
188 .await?;
189
190 if !response.status().is_success() {
191 return Err(OpenPondError::ApiError {
192 status: response.status().as_u16(),
193 message: response.text().await?,
194 });
195 }
196
197 Ok(response.json().await?)
198 }
199
200 pub async fn list_agents(&self) -> Result<Vec<Agent>> {
202 let response = self.client
203 .get(&format!("{}/agents", self.config.api_url))
204 .send()
205 .await?;
206
207 if !response.status().is_success() {
208 return Err(OpenPondError::ApiError {
209 status: response.status().as_u16(),
210 message: response.text().await?,
211 });
212 }
213
214 let data: serde_json::Value = response.json().await?;
215 Ok(serde_json::from_value(data["agents"].clone())?)
216 }
217
218 async fn register_agent(&self) -> Result<()> {
219 let response = self.client
220 .post(&format!("{}/agents/register", self.config.api_url))
221 .json(&serde_json::json!({
222 "privateKey": self.config.private_key,
223 "name": self.config.agent_name,
224 }))
225 .send()
226 .await?;
227
228 if !response.status().is_success() && response.status() != reqwest::StatusCode::CONFLICT {
230 return Err(OpenPondError::ApiError {
231 status: response.status().as_u16(),
232 message: response.text().await?,
233 });
234 }
235
236 Ok(())
237 }
238}