Skip to main content

do_memory_storage_turso/
relationships.rs

1//! Storage implementation for episode relationships in Turso.
2//!
3//! Provides CRUD operations for managing relationships between episodes.
4
5use crate::{Result, TursoStorage};
6use do_memory_core::episode::{
7    Direction, EpisodeRelationship, RelationshipMetadata, RelationshipType,
8};
9use std::collections::HashMap;
10use tracing::debug;
11use uuid::Uuid;
12
13impl TursoStorage {
14    /// Add a relationship between two episodes
15    pub async fn add_relationship(
16        &self,
17        from_episode_id: Uuid,
18        to_episode_id: Uuid,
19        relationship_type: RelationshipType,
20        metadata: RelationshipMetadata,
21    ) -> Result<Uuid> {
22        let conn = self.get_connection().await?;
23        let relationship_id = Uuid::new_v4();
24        let created_at = chrono::Utc::now().timestamp();
25
26        let metadata_json = serde_json::to_string(&metadata.custom_fields)
27            .map_err(|e| do_memory_core::Error::Storage(format!("Serialization error: {}", e)))?;
28
29        self.execute_with_retry(
30            &conn,
31            &format!(
32                "{} VALUES ('{}', '{}', '{}', '{}', {}, {}, {}, '{}', {})",
33                "INSERT INTO episode_relationships (relationship_id, from_episode_id, to_episode_id, relationship_type, reason, created_by, priority, metadata, created_at)",
34                relationship_id,
35                from_episode_id,
36                to_episode_id,
37                relationship_type.as_str(),
38                metadata.reason.as_ref().map(|r| format!("'{}'", r.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()),
39                metadata.created_by.as_ref().map(|c| format!("'{}'", c.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()),
40                metadata.priority.map(|p| p.to_string()).unwrap_or_else(|| "NULL".to_string()),
41                metadata_json.replace('\'', "''"),
42                created_at
43            ),
44        )
45        .await?;
46
47        debug!(
48            "Added relationship {} from {} to {} (type: {:?})",
49            relationship_id, from_episode_id, to_episode_id, relationship_type
50        );
51
52        Ok(relationship_id)
53    }
54
55    /// Store a relationship between two episodes
56    ///
57    /// This is the StorageBackend trait implementation that takes a pre-built EpisodeRelationship.
58    pub async fn store_relationship(&self, relationship: &EpisodeRelationship) -> Result<()> {
59        let conn = self.get_connection().await?;
60        let created_at = relationship.created_at.timestamp();
61
62        let metadata_json = serde_json::to_string(&relationship.metadata.custom_fields)
63            .map_err(|e| do_memory_core::Error::Storage(format!("Serialization error: {}", e)))?;
64
65        self.execute_with_retry(
66            &conn,
67            &format!(
68                "{} VALUES ('{}', '{}', '{}', '{}', {}, {}, {}, '{}', {})",
69                "INSERT INTO episode_relationships (relationship_id, from_episode_id, to_episode_id, relationship_type, reason, created_by, priority, metadata, created_at)",
70                relationship.id,
71                relationship.from_episode_id,
72                relationship.to_episode_id,
73                relationship.relationship_type.as_str(),
74                relationship.metadata.reason.as_ref().map(|r| format!("'{}'", r.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()),
75                relationship.metadata.created_by.as_ref().map(|c| format!("'{}'", c.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()),
76                relationship.metadata.priority.map(|p| p.to_string()).unwrap_or_else(|| "NULL".to_string()),
77                metadata_json.replace('\'', "''"),
78                created_at
79            ),
80        )
81        .await?;
82
83        debug!(
84            "Stored relationship {} from {} to {} (type: {:?})",
85            relationship.id,
86            relationship.from_episode_id,
87            relationship.to_episode_id,
88            relationship.relationship_type
89        );
90
91        Ok(())
92    }
93
94    /// Remove a relationship by ID
95    pub async fn remove_relationship(&self, relationship_id: Uuid) -> Result<()> {
96        let conn = self.get_connection().await?;
97
98        let sql = format!(
99            "DELETE FROM episode_relationships WHERE relationship_id = '{}'",
100            relationship_id
101        );
102
103        self.execute_with_retry(&conn, &sql).await?;
104
105        debug!("Removed relationship {}", relationship_id);
106        Ok(())
107    }
108
109    /// Get relationships for an episode
110    pub async fn get_relationships(
111        &self,
112        episode_id: Uuid,
113        direction: Direction,
114    ) -> Result<Vec<EpisodeRelationship>> {
115        let conn = self.get_connection().await?;
116
117        let sql = match direction {
118            Direction::Outgoing => format!(
119                "SELECT * FROM episode_relationships WHERE from_episode_id = '{}'",
120                episode_id
121            ),
122            Direction::Incoming => format!(
123                "SELECT * FROM episode_relationships WHERE to_episode_id = '{}'",
124                episode_id
125            ),
126            Direction::Both => format!(
127                "SELECT * FROM episode_relationships WHERE from_episode_id = '{}' OR to_episode_id = '{}'",
128                episode_id, episode_id
129            ),
130        };
131
132        let stmt = conn.prepare(&sql).await.map_err(|e| {
133            do_memory_core::Error::Storage(format!("Failed to prepare query: {}", e))
134        })?;
135
136        let mut rows = stmt.query(()).await.map_err(|e| {
137            do_memory_core::Error::Storage(format!("Failed to execute query: {}", e))
138        })?;
139
140        let mut relationships = Vec::new();
141
142        while let Some(row) = rows
143            .next()
144            .await
145            .map_err(|e| do_memory_core::Error::Storage(format!("Failed to fetch row: {}", e)))?
146        {
147            let relationship = self.row_to_relationship(&row)?;
148            relationships.push(relationship);
149        }
150
151        debug!(
152            "Found {} relationships for episode {} (direction: {:?})",
153            relationships.len(),
154            episode_id,
155            direction
156        );
157
158        Ok(relationships)
159    }
160
161    /// Get relationships by type
162    pub async fn get_relationships_by_type(
163        &self,
164        episode_id: Uuid,
165        relationship_type: RelationshipType,
166        direction: Direction,
167    ) -> Result<Vec<EpisodeRelationship>> {
168        let conn = self.get_connection().await?;
169
170        let sql = match direction {
171            Direction::Outgoing => format!(
172                "SELECT * FROM episode_relationships WHERE from_episode_id = '{}' AND relationship_type = '{}'",
173                episode_id,
174                relationship_type.as_str()
175            ),
176            Direction::Incoming => format!(
177                "SELECT * FROM episode_relationships WHERE to_episode_id = '{}' AND relationship_type = '{}'",
178                episode_id,
179                relationship_type.as_str()
180            ),
181            Direction::Both => format!(
182                "SELECT * FROM episode_relationships WHERE (from_episode_id = '{}' OR to_episode_id = '{}') AND relationship_type = '{}'",
183                episode_id,
184                episode_id,
185                relationship_type.as_str()
186            ),
187        };
188
189        let stmt = conn.prepare(&sql).await.map_err(|e| {
190            do_memory_core::Error::Storage(format!("Failed to prepare query: {}", e))
191        })?;
192
193        let mut rows = stmt.query(()).await.map_err(|e| {
194            do_memory_core::Error::Storage(format!("Failed to execute query: {}", e))
195        })?;
196
197        let mut relationships = Vec::new();
198
199        while let Some(row) = rows
200            .next()
201            .await
202            .map_err(|e| do_memory_core::Error::Storage(format!("Failed to fetch row: {}", e)))?
203        {
204            let relationship = self.row_to_relationship(&row)?;
205            relationships.push(relationship);
206        }
207
208        Ok(relationships)
209    }
210
211    /// Check if a relationship exists
212    pub async fn relationship_exists(
213        &self,
214        from_episode_id: Uuid,
215        to_episode_id: Uuid,
216        relationship_type: RelationshipType,
217    ) -> Result<bool> {
218        let conn = self.get_connection().await?;
219
220        let sql = format!(
221            "SELECT COUNT(*) as count FROM episode_relationships WHERE from_episode_id = '{}' AND to_episode_id = '{}' AND relationship_type = '{}'",
222            from_episode_id,
223            to_episode_id,
224            relationship_type.as_str()
225        );
226
227        let stmt = conn.prepare(&sql).await.map_err(|e| {
228            do_memory_core::Error::Storage(format!("Failed to prepare query: {}", e))
229        })?;
230
231        let mut rows = stmt.query(()).await.map_err(|e| {
232            do_memory_core::Error::Storage(format!("Failed to execute query: {}", e))
233        })?;
234
235        if let Some(row) = rows
236            .next()
237            .await
238            .map_err(|e| do_memory_core::Error::Storage(format!("Failed to fetch row: {}", e)))?
239        {
240            let count: i64 = row.get(0).map_err(|e| {
241                do_memory_core::Error::Storage(format!("Failed to get count: {}", e))
242            })?;
243            Ok(count > 0)
244        } else {
245            Ok(false)
246        }
247    }
248
249    /// Helper to convert a database row to an EpisodeRelationship
250    fn row_to_relationship(&self, row: &libsql::Row) -> Result<EpisodeRelationship> {
251        let relationship_id_str: String = row.get(0).map_err(|e| {
252            do_memory_core::Error::Storage(format!("Failed to get relationship_id: {}", e))
253        })?;
254        let from_episode_id_str: String = row.get(1).map_err(|e| {
255            do_memory_core::Error::Storage(format!("Failed to get from_episode_id: {}", e))
256        })?;
257        let to_episode_id_str: String = row.get(2).map_err(|e| {
258            do_memory_core::Error::Storage(format!("Failed to get to_episode_id: {}", e))
259        })?;
260        let relationship_type_str: String = row.get(3).map_err(|e| {
261            do_memory_core::Error::Storage(format!("Failed to get relationship_type: {}", e))
262        })?;
263
264        let reason: Option<String> = row.get(4).ok();
265        let created_by: Option<String> = row.get(5).ok();
266        let priority: Option<i64> = row.get(6).ok();
267        let metadata_json: String = row.get(7).map_err(|e| {
268            do_memory_core::Error::Storage(format!("Failed to get metadata: {}", e))
269        })?;
270        let created_at_timestamp: i64 = row.get(8).map_err(|e| {
271            do_memory_core::Error::Storage(format!("Failed to get created_at: {}", e))
272        })?;
273
274        let relationship_id = Uuid::parse_str(&relationship_id_str).map_err(|e| {
275            do_memory_core::Error::Storage(format!("Invalid relationship_id UUID: {}", e))
276        })?;
277        let from_episode_id = Uuid::parse_str(&from_episode_id_str).map_err(|e| {
278            do_memory_core::Error::Storage(format!("Invalid from_episode_id UUID: {}", e))
279        })?;
280        let to_episode_id = Uuid::parse_str(&to_episode_id_str).map_err(|e| {
281            do_memory_core::Error::Storage(format!("Invalid to_episode_id UUID: {}", e))
282        })?;
283
284        let relationship_type = RelationshipType::parse(&relationship_type_str)
285            .map_err(do_memory_core::Error::Storage)?;
286
287        let custom_fields: HashMap<String, String> = serde_json::from_str(&metadata_json)
288            .map_err(|e| do_memory_core::Error::Storage(e.to_string()))?;
289
290        let metadata = RelationshipMetadata {
291            reason,
292            created_by,
293            priority: priority.map(|p| p as u8),
294            custom_fields,
295        };
296
297        let created_at = chrono::DateTime::from_timestamp(created_at_timestamp, 0)
298            .ok_or_else(|| do_memory_core::Error::Storage("Invalid timestamp".to_string()))?;
299
300        Ok(EpisodeRelationship {
301            id: relationship_id,
302            from_episode_id,
303            to_episode_id,
304            relationship_type,
305            metadata,
306            created_at,
307        })
308    }
309
310    /// Get all episodes that depend on the given episode (blocking it)
311    pub async fn get_dependent_episodes(&self, episode_id: Uuid) -> Result<Vec<Uuid>> {
312        let relationships = self
313            .get_relationships_by_type(episode_id, RelationshipType::DependsOn, Direction::Incoming)
314            .await?;
315
316        Ok(relationships
317            .into_iter()
318            .map(|r| r.from_episode_id)
319            .collect())
320    }
321
322    /// Get all episodes that the given episode depends on
323    pub async fn get_dependencies(&self, episode_id: Uuid) -> Result<Vec<Uuid>> {
324        let relationships = self
325            .get_relationships_by_type(episode_id, RelationshipType::DependsOn, Direction::Outgoing)
326            .await?;
327
328        Ok(relationships.into_iter().map(|r| r.to_episode_id).collect())
329    }
330}
331
332#[cfg(test)]
333#[path = "relationships_tests.rs"]
334mod tests;