openpond_sdk/
lib.rs

1mod error;
2mod types;
3
4use std::sync::Arc;
5use tokio::sync::Mutex;
6use eventsource_client::{Client, SSE, ClientBuilder};
7use futures::StreamExt;
8
9pub use error::{OpenPondError, Result};
10pub use types::*;
11
12/// OpenPond SDK for interacting with the P2P network.
13///
14/// The SDK can be used in two ways:
15/// 1. With a private key - Creates your own agent identity with full control
16/// 2. Without a private key - Uses a hosted agent
17///
18/// Both modes can optionally use an apiKey for authenticated access.
19#[derive(Clone)]
20pub struct OpenPondSDK {
21    client: reqwest::Client,
22    config: OpenPondConfig,
23    message_callback: Arc<Mutex<Option<Box<dyn Fn(Message) + Send + Sync>>>>,
24    error_callback: Arc<Mutex<Option<Box<dyn Fn(OpenPondError) + Send + Sync>>>>,
25}
26
27impl OpenPondSDK {
28    /// Creates a new instance of the OpenPond SDK
29    pub fn new(config: OpenPondConfig) -> Self {
30        let mut headers = reqwest::header::HeaderMap::new();
31        headers.insert(
32            reqwest::header::CONTENT_TYPE,
33            "application/json".parse().unwrap(),
34        );
35
36        if let Some(api_key) = &config.api_key {
37            headers.insert(
38                "X-API-Key",
39                api_key.parse().unwrap(),
40            );
41        }
42
43        let client = reqwest::Client::builder()
44            .default_headers(headers)
45            .build()
46            .unwrap();
47
48        Self {
49            client,
50            config,
51            message_callback: Arc::new(Mutex::new(None)),
52            error_callback: Arc::new(Mutex::new(None)),
53        }
54    }
55
56    /// Set callback for receiving messages
57    pub async fn on_message<F>(&self, callback: F)
58    where
59        F: Fn(Message) + Send + Sync + 'static,
60    {
61        let mut cb = self.message_callback.lock().await;
62        *cb = Some(Box::new(callback));
63    }
64
65    /// Set callback for handling errors
66    pub async fn on_error<F>(&self, callback: F)
67    where
68        F: Fn(OpenPondError) + Send + Sync + 'static,
69    {
70        let mut cb = self.error_callback.lock().await;
71        *cb = Some(Box::new(callback));
72    }
73
74    /// Starts the SDK and begins listening for messages using SSE
75    pub async fn start(&self) -> Result<()> {
76        // Register the agent if not already registered
77        self.register_agent().await?;
78
79        // Setup SSE client
80        let url = format!("{}/messages/stream", self.config.api_url);
81        let mut builder = ClientBuilder::for_url(&url).map_err(|_| OpenPondError::SSEError)?;
82        
83        // Add required headers
84        builder = builder
85            .header("Accept", "text/event-stream")
86            .map_err(|_| OpenPondError::SSEError)?;
87
88        // Add authentication headers
89        if let Some(private_key) = &self.config.private_key {
90            let timestamp = chrono::Utc::now().timestamp_millis().to_string();
91            let message = format!("Authenticate to OpenPond API at timestamp {}", timestamp);
92            
93            builder = builder
94                .header("X-Agent-Id", private_key)
95                .map_err(|_| OpenPondError::SSEError)?
96                .header("X-Timestamp", &timestamp)
97                .map_err(|_| OpenPondError::SSEError)?;
98                
99            // TODO: Add signature header once we implement signing
100            // .header("X-Signature", signature)
101        }
102        
103        if let Some(api_key) = &self.config.api_key {
104            builder = builder
105                .header("X-API-Key", api_key)
106                .map_err(|_| OpenPondError::SSEError)?;
107        }
108
109        // Clone the callbacks for the async task
110        let message_callback = self.message_callback.clone();
111        let error_callback = self.error_callback.clone();
112        let agent_id = self.config.private_key.clone();
113
114        // Start listening for events in a separate task
115        tokio::spawn(async move {
116            let client = builder.build();
117            let mut stream = client.stream();
118
119            while let Some(event) = stream.next().await {
120                match event {
121                    Ok(SSE::Event(event)) => {
122                        if let Ok(msg) = serde_json::from_str::<Message>(&event.data) {
123                            // Only process messages intended for us
124                            if let Some(ref our_id) = agent_id {
125                                if msg.to_agent_id == *our_id {
126                                    if let Some(cb) = message_callback.lock().await.as_ref() {
127                                        cb(msg);
128                                    }
129                                }
130                            }
131                        }
132                    }
133                    Err(e) => {
134                        if let Some(cb) = error_callback.lock().await.as_ref() {
135                            cb(OpenPondError::from(e));
136                        }
137                        // Wait a bit before reconnecting on error
138                        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
139                    }
140                    _ => {} // Ignore other event types
141                }
142            }
143        });
144
145        Ok(())
146    }
147
148    /// Stops the SDK and cleans up resources
149    pub async fn stop(&self) -> Result<()> {
150        // Nothing to clean up since the stream will be dropped when the task ends
151        Ok(())
152    }
153
154    /// Sends a message to another agent
155    pub async fn send_message(
156        &self,
157        to_agent_id: &str,
158        content: &str,
159        options: Option<SendMessageOptions>,
160    ) -> Result<String> {
161        let response = self.client
162            .post(&format!("{}/messages", self.config.api_url))
163            .json(&serde_json::json!({
164                "toAgentId": to_agent_id,
165                "content": content,
166                "privateKey": self.config.private_key,
167                "options": options,
168            }))
169            .send()
170            .await?;
171
172        if !response.status().is_success() {
173            return Err(OpenPondError::ApiError {
174                status: response.status().as_u16(),
175                message: response.text().await?,
176            });
177        }
178
179        let data: serde_json::Value = response.json().await?;
180        Ok(data["messageId"].as_str().unwrap_or_default().to_string())
181    }
182
183    /// Gets information about an agent
184    pub async fn get_agent(&self, agent_id: &str) -> Result<Agent> {
185        let response = self.client
186            .get(&format!("{}/agents/{}", self.config.api_url, agent_id))
187            .send()
188            .await?;
189
190        if !response.status().is_success() {
191            return Err(OpenPondError::ApiError {
192                status: response.status().as_u16(),
193                message: response.text().await?,
194            });
195        }
196
197        Ok(response.json().await?)
198    }
199
200    /// Lists all registered agents
201    pub async fn list_agents(&self) -> Result<Vec<Agent>> {
202        let response = self.client
203            .get(&format!("{}/agents", self.config.api_url))
204            .send()
205            .await?;
206
207        if !response.status().is_success() {
208            return Err(OpenPondError::ApiError {
209                status: response.status().as_u16(),
210                message: response.text().await?,
211            });
212        }
213
214        let data: serde_json::Value = response.json().await?;
215        Ok(serde_json::from_value(data["agents"].clone())?)
216    }
217
218    async fn register_agent(&self) -> Result<()> {
219        let response = self.client
220            .post(&format!("{}/agents/register", self.config.api_url))
221            .json(&serde_json::json!({
222                "privateKey": self.config.private_key,
223                "name": self.config.agent_name,
224            }))
225            .send()
226            .await?;
227
228        // Ignore 409 Conflict (already registered)
229        if !response.status().is_success() && response.status() != reqwest::StatusCode::CONFLICT {
230            return Err(OpenPondError::ApiError {
231                status: response.status().as_u16(),
232                message: response.text().await?,
233            });
234        }
235
236        Ok(())
237    }
238}