miyabi_a2a/storage/
github.rs

1//! GitHub Issues-based task storage
2
3use super::{
4    cursor::{PaginatedResult, PaginationCursor},
5    StorageError, TaskFilter, TaskStorage, TaskUpdate,
6};
7use crate::task::{A2ATask, TaskStatus, TaskType};
8use async_trait::async_trait;
9use octocrab::{models::issues::Issue, Octocrab};
10use tracing::{debug, info};
11
12/// GitHub Issues task storage implementation
13pub struct GitHubTaskStorage {
14    client: Octocrab,
15    repo_owner: String,
16    repo_name: String,
17}
18
19impl GitHubTaskStorage {
20    /// Create a new GitHub task storage
21    ///
22    /// # Arguments
23    /// * `token` - GitHub personal access token
24    /// * `repo_owner` - Repository owner (org or user)
25    /// * `repo_name` - Repository name
26    pub fn new(token: String, repo_owner: String, repo_name: String) -> Result<Self, StorageError> {
27        let client = Octocrab::builder()
28            .personal_token(token)
29            .build()
30            .map_err(|e| StorageError::Auth(e.to_string()))?;
31
32        Ok(Self {
33            client,
34            repo_owner,
35            repo_name,
36        })
37    }
38
39    /// Convert GitHub Issue to A2ATask
40    ///
41    /// Extracts task information from GitHub Issue metadata:
42    /// - Status: From `a2a:pending`, `a2a:in-progress`, etc. labels
43    /// - Task Type: From `a2a:codegen`, `a2a:review`, etc. labels
44    /// - Agent: From Issue assignee
45    /// - Timestamps: From Issue created_at/updated_at
46    ///
47    /// # Arguments
48    /// * `issue` - GitHub Issue to convert
49    ///
50    /// # Returns
51    /// Converted A2ATask with metadata extracted from labels and fields
52    fn issue_to_task(&self, issue: Issue) -> Result<A2ATask, StorageError> {
53        // Extract status from labels
54        let status = issue
55            .labels
56            .iter()
57            .find_map(|label| {
58                let name = label.name.as_str();
59                name.strip_prefix("a2a:").and_then(|status| {
60                    match status {
61                        "submitted" => Some(TaskStatus::Submitted),
62                        "working" => Some(TaskStatus::Working),
63                        "completed" => Some(TaskStatus::Completed),
64                        "failed" => Some(TaskStatus::Failed),
65                        "cancelled" => Some(TaskStatus::Cancelled),
66                        // Legacy label compatibility
67                        "pending" => Some(TaskStatus::Submitted),
68                        "in-progress" => Some(TaskStatus::Working),
69                        _ => None,
70                    }
71                })
72            })
73            .unwrap_or(TaskStatus::Submitted);
74
75        // Extract task type from labels
76        let task_type = issue
77            .labels
78            .iter()
79            .find_map(|label| {
80                let name = label.name.as_str();
81                name.strip_prefix("a2a:").and_then(|task_type| match task_type {
82                    "codegen" => Some(TaskType::CodeGeneration),
83                    "review" => Some(TaskType::CodeReview),
84                    "testing" => Some(TaskType::Testing),
85                    "deployment" => Some(TaskType::Deployment),
86                    "documentation" => Some(TaskType::Documentation),
87                    "analysis" => Some(TaskType::Analysis),
88                    _ => None,
89                })
90            })
91            .unwrap_or(TaskType::Analysis);
92
93        // Extract agent from assignee
94        let agent = issue.assignee.as_ref().map(|a| a.login.clone());
95
96        // Extract retry_count from label (format: "retry:N")
97        let retry_count = issue
98            .labels
99            .iter()
100            .find_map(|label| {
101                label
102                    .name
103                    .strip_prefix("retry:")
104                    .and_then(|count_str| count_str.parse::<u32>().ok())
105            })
106            .unwrap_or(0);
107
108        Ok(A2ATask {
109            id: issue.number,
110            title: issue.title,
111            description: issue.body.unwrap_or_default(),
112            status,
113            task_type,
114            agent,
115            context_id: None, // TODO: Extract from body or custom field
116            priority: 3,      // TODO: Extract from labels
117            retry_count,
118            created_at: issue.created_at,
119            updated_at: issue.updated_at,
120            issue_url: issue.html_url.to_string(),
121        })
122    }
123}
124
125#[async_trait]
126impl TaskStorage for GitHubTaskStorage {
127    /// Save a new task by creating a GitHub Issue
128    ///
129    /// Creates a GitHub Issue with:
130    /// - Title and description from task
131    /// - Status label (e.g., `a2a:pending`)
132    /// - Task type label (e.g., `a2a:codegen`)
133    ///
134    /// # Arguments
135    /// * `task` - The task to save
136    ///
137    /// # Returns
138    /// GitHub Issue number (task ID)
139    async fn save_task(&self, task: A2ATask) -> Result<u64, StorageError> {
140        info!("Creating GitHub Issue for task: {}", task.title);
141
142        let issue = self
143            .client
144            .issues(&self.repo_owner, &self.repo_name)
145            .create(&task.title)
146            .body(&task.description)
147            .labels(vec![task.status.to_label(), task.task_type.to_label()])
148            .send()
149            .await?;
150
151        debug!("Created GitHub Issue #{}: {}", issue.number, issue.html_url);
152
153        Ok(issue.number)
154    }
155
156    /// Get a task by ID (GitHub Issue number)
157    ///
158    /// # Arguments
159    /// * `id` - GitHub Issue number
160    ///
161    /// # Returns
162    /// - `Some(task)` if found
163    /// - `None` if not found (404)
164    /// - `Err` for other errors
165    async fn get_task(&self, id: u64) -> Result<Option<A2ATask>, StorageError> {
166        debug!("Fetching task #{} from GitHub", id);
167
168        match self.client.issues(&self.repo_owner, &self.repo_name).get(id).await {
169            Ok(issue) => {
170                let task = self.issue_to_task(issue)?;
171                Ok(Some(task))
172            },
173            Err(octocrab::Error::GitHub { source, .. }) if source.message.contains("Not Found") => {
174                debug!("Task #{} not found", id);
175                Ok(None)
176            },
177            Err(e) => Err(StorageError::from(e)),
178        }
179    }
180
181    /// List tasks with optional filtering
182    ///
183    /// Uses GitHub API label filtering for status, combined with
184    /// in-memory filtering for other criteria.
185    ///
186    /// # Arguments
187    /// * `filter` - Filter criteria (status, context_id, agent, updated_after, limit)
188    ///
189    /// # Returns
190    /// List of tasks matching the filter
191    ///
192    /// # Performance
193    /// - Status filtering: API-level (reduces network transfer)
194    /// - Other filters: In-memory (applied after fetch)
195    async fn list_tasks(&self, filter: TaskFilter) -> Result<Vec<A2ATask>, StorageError> {
196        debug!("Listing tasks with filter: {:?}", filter);
197
198        // Build GitHub API query with label filtering
199        let issues = self.client.issues(&self.repo_owner, &self.repo_name);
200        let per_page = filter.limit.unwrap_or(30) as u8;
201
202        // Apply status filter at API level via labels
203        let page = if let Some(status) = filter.status {
204            let label = status.to_label();
205            let labels = vec![label.clone()];
206            debug!("Applying API-level status filter: {}", label);
207
208            issues.list().labels(&labels).per_page(per_page).send().await?
209        } else {
210            issues.list().per_page(per_page).send().await?
211        };
212
213        // Convert GitHub Issues to A2ATasks
214        let all_tasks: Result<Vec<A2ATask>, StorageError> =
215            page.items.into_iter().map(|issue| self.issue_to_task(issue)).collect();
216
217        let mut tasks = all_tasks?;
218
219        // Apply remaining filters in memory
220        // (Status already filtered at API level)
221
222        if let Some(ref context_id) = filter.context_id {
223            tasks.retain(|t| t.context_id.as_ref() == Some(context_id));
224        }
225
226        if let Some(ref agent) = filter.agent {
227            tasks.retain(|t| t.agent.as_ref() == Some(agent));
228        }
229
230        if let Some(after) = filter.last_updated_after {
231            tasks.retain(|t| t.updated_at > after);
232        }
233
234        Ok(tasks)
235    }
236
237    /// List tasks with cursor-based pagination
238    ///
239    /// Implements cursor-based pagination for efficient navigation through large task lists.
240    /// Cursors encode the (id, updated_at) of the last item, ensuring stable pagination.
241    ///
242    /// # Arguments
243    /// * `filter` - Filter criteria including optional cursor
244    ///
245    /// # Returns
246    /// PaginatedResult with items, next_cursor, previous_cursor, and has_more flag
247    ///
248    /// # Pagination Strategy
249    /// - **Forward**: Fetch limit+1 items after cursor to determine if has_more
250    /// - **Backward**: Fetch limit+1 items before cursor
251    /// - **First page**: No cursor provided, start from beginning
252    ///
253    /// # Examples
254    ///
255    /// ```no_run
256    /// use miyabi_a2a::{GitHubTaskStorage, TaskStorage, TaskFilter};
257    ///
258    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
259    /// let storage = GitHubTaskStorage::new("token".into(), "owner".into(), "repo".into())?;
260    ///
261    /// // First page (50 items)
262    /// let filter = TaskFilter { limit: Some(50), ..Default::default() };
263    /// let page1 = storage.list_tasks_paginated(filter).await?;
264    ///
265    /// // Navigate to next page
266    /// if let Some(cursor) = page1.next_cursor {
267    ///     let filter = TaskFilter { cursor: Some(cursor), limit: Some(50), ..Default::default() };
268    ///     let page2 = storage.list_tasks_paginated(filter).await?;
269    /// }
270    /// # Ok(())
271    /// # }
272    /// ```
273    async fn list_tasks_paginated(
274        &self,
275        filter: TaskFilter,
276    ) -> Result<PaginatedResult<A2ATask>, StorageError> {
277        debug!("Listing tasks with pagination: {:?}", filter);
278
279        // Default limit: 50, max: 100
280        let limit = filter.limit.unwrap_or(50).min(100);
281
282        // Decode cursor if provided
283        let cursor = filter
284            .cursor
285            .as_ref()
286            .map(|c| PaginationCursor::decode(c))
287            .transpose()
288            .map_err(|e| StorageError::Other(format!("Invalid cursor: {}", e)))?;
289
290        // Build GitHub API query
291        let issues = self.client.issues(&self.repo_owner, &self.repo_name);
292
293        // Fetch limit+1 items to determine has_more
294        let fetch_count = (limit + 1) as u8;
295
296        // Apply status filter at API level
297        let page = if let Some(status) = filter.status {
298            let label = status.to_label();
299            let labels = vec![label.clone()];
300            debug!("Applying API-level status filter: {}", label);
301
302            issues.list().labels(&labels).per_page(fetch_count).send().await?
303        } else {
304            issues.list().per_page(fetch_count).send().await?
305        };
306
307        // Convert to tasks
308        let all_tasks: Result<Vec<A2ATask>, StorageError> =
309            page.items.into_iter().map(|issue| self.issue_to_task(issue)).collect();
310
311        let mut tasks = all_tasks?;
312
313        // Apply cursor filtering if present
314        if let Some(ref c) = cursor {
315            match c.direction {
316                super::cursor::Direction::Forward => {
317                    // Forward: Keep tasks with (updated_at, id) > cursor
318                    tasks.retain(|t| {
319                        t.updated_at > c.last_updated
320                            || (t.updated_at == c.last_updated && t.id > c.last_id)
321                    });
322                },
323                super::cursor::Direction::Backward => {
324                    // Backward: Keep tasks with (updated_at, id) < cursor
325                    tasks.retain(|t| {
326                        t.updated_at < c.last_updated
327                            || (t.updated_at == c.last_updated && t.id < c.last_id)
328                    });
329                    // Reverse order for backward pagination
330                    tasks.reverse();
331                },
332            }
333        }
334
335        // Apply remaining filters in memory
336        if let Some(ref context_id) = filter.context_id {
337            tasks.retain(|t| t.context_id.as_ref() == Some(context_id));
338        }
339
340        if let Some(ref agent) = filter.agent {
341            tasks.retain(|t| t.agent.as_ref() == Some(agent));
342        }
343
344        if let Some(after) = filter.last_updated_after {
345            tasks.retain(|t| t.updated_at > after);
346        }
347
348        // Sort by (updated_at DESC, id DESC) for stable ordering
349        tasks.sort_by(|a, b| b.updated_at.cmp(&a.updated_at).then_with(|| b.id.cmp(&a.id)));
350
351        // Determine has_more and trim to limit
352        let has_more = tasks.len() > limit;
353        if has_more {
354            tasks.truncate(limit);
355        }
356
357        // Generate cursors
358        let next_cursor =
359            if has_more && !tasks.is_empty() {
360                let last = tasks.last().unwrap();
361                let cursor = PaginationCursor::forward(last.id, last.updated_at);
362                Some(cursor.encode().map_err(|e| {
363                    StorageError::Other(format!("Failed to encode next cursor: {}", e))
364                })?)
365            } else {
366                None
367            };
368
369        let previous_cursor = if !tasks.is_empty() && cursor.is_some() {
370            let first = tasks.first().unwrap();
371            let cursor = PaginationCursor::backward(first.id, first.updated_at);
372            Some(cursor.encode().map_err(|e| {
373                StorageError::Other(format!("Failed to encode previous cursor: {}", e))
374            })?)
375        } else {
376            None
377        };
378
379        Ok(PaginatedResult::new(tasks, next_cursor, previous_cursor, has_more))
380    }
381
382    /// Update an existing task
383    ///
384    /// Updates task fields via GitHub Issue API:
385    /// - **Description**: Updates Issue body
386    /// - **Status**: Updates labels (removes old status label, adds new one)
387    ///
388    /// Status update process:
389    /// 1. Fetch current Issue to get existing labels
390    /// 2. Filter out old status labels (a2a:pending, a2a:in-progress, etc.)
391    /// 3. Add new status label
392    /// 4. Update Issue with new label set
393    ///
394    /// # Arguments
395    /// * `id` - GitHub Issue number
396    /// * `update` - Fields to update
397    ///
398    /// # Returns
399    /// Ok(()) on success
400    async fn update_task(&self, id: u64, update: TaskUpdate) -> Result<(), StorageError> {
401        info!("Updating task #{}", id);
402
403        // Update description if provided
404        if let Some(description) = update.description {
405            self.client
406                .issues(&self.repo_owner, &self.repo_name)
407                .update(id)
408                .body(&description)
409                .send()
410                .await?;
411
412            debug!("Updated task #{} description", id);
413        }
414
415        // Update status or retry_count via labels
416        if update.status.is_some() || update.retry_count.is_some() {
417            // Fetch current issue to get existing labels
418            let issue = self.client.issues(&self.repo_owner, &self.repo_name).get(id).await?;
419
420            // Filter out old status labels and retry labels
421            let mut new_labels: Vec<String> = issue
422                .labels
423                .iter()
424                .filter(|label| {
425                    !label.name.starts_with("a2a:pending")
426                        && !label.name.starts_with("a2a:in-progress")
427                        && !label.name.starts_with("a2a:completed")
428                        && !label.name.starts_with("a2a:failed")
429                        && !label.name.starts_with("a2a:blocked")
430                        && !label.name.starts_with("retry:")
431                })
432                .map(|label| label.name.clone())
433                .collect();
434
435            // Add new status label if provided
436            if let Some(new_status) = update.status {
437                new_labels.push(new_status.to_label());
438                debug!("Updated task #{} status to {:?}", id, new_status);
439            }
440
441            // Add new retry_count label if provided
442            if let Some(retry_count) = update.retry_count {
443                new_labels.push(format!("retry:{}", retry_count));
444                debug!("Updated task #{} retry_count to {}", id, retry_count);
445            }
446
447            // Update issue labels
448            self.client
449                .issues(&self.repo_owner, &self.repo_name)
450                .update(id)
451                .labels(&new_labels)
452                .send()
453                .await?;
454        }
455
456        Ok(())
457    }
458
459    /// Delete a task (closes GitHub Issue)
460    ///
461    /// GitHub Issues cannot be permanently deleted, so this method
462    /// closes the Issue instead. Closed Issues remain in the repository
463    /// for audit trail purposes.
464    ///
465    /// # Arguments
466    /// * `id` - GitHub Issue number
467    ///
468    /// # Returns
469    /// Ok(()) on success
470    async fn delete_task(&self, id: u64) -> Result<(), StorageError> {
471        info!("Closing task #{} (GitHub Issues don't support deletion)", id);
472
473        // GitHub Issues can't be deleted, only closed
474        use octocrab::models::IssueState;
475
476        self.client
477            .issues(&self.repo_owner, &self.repo_name)
478            .update(id)
479            .state(IssueState::Closed)
480            .send()
481            .await?;
482
483        debug!("Closed task #{}", id);
484
485        Ok(())
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use crate::storage::cursor::{Direction, PaginationCursor};
493    use chrono::Utc;
494
495    #[tokio::test]
496    async fn test_github_storage_construction() {
497        // This test just ensures the struct can be created
498        // Actual API tests require a real GitHub token
499        let result = GitHubTaskStorage::new(
500            "fake_token".to_string(),
501            "owner".to_string(),
502            "repo".to_string(),
503        );
504        assert!(result.is_ok());
505    }
506
507    #[test]
508    fn test_cursor_encode_decode_roundtrip() {
509        let timestamp = Utc::now();
510        let cursor = PaginationCursor::forward(123, timestamp);
511
512        let encoded = cursor.encode().unwrap();
513        let decoded = PaginationCursor::decode(&encoded).unwrap();
514
515        assert_eq!(cursor.last_id, decoded.last_id);
516        assert_eq!(cursor.last_updated, decoded.last_updated);
517        assert_eq!(cursor.direction, decoded.direction);
518    }
519
520    #[test]
521    fn test_forward_cursor_creation() {
522        let timestamp = Utc::now();
523        let cursor = PaginationCursor::forward(456, timestamp);
524
525        assert_eq!(cursor.last_id, 456);
526        assert_eq!(cursor.last_updated, timestamp);
527        assert_eq!(cursor.direction, Direction::Forward);
528    }
529
530    #[test]
531    fn test_backward_cursor_creation() {
532        let timestamp = Utc::now();
533        let cursor = PaginationCursor::backward(789, timestamp);
534
535        assert_eq!(cursor.last_id, 789);
536        assert_eq!(cursor.last_updated, timestamp);
537        assert_eq!(cursor.direction, Direction::Backward);
538    }
539
540    #[test]
541    fn test_paginated_result_structure() {
542        let result = PaginatedResult::new(
543            vec![1, 2, 3],
544            Some("next_cursor".to_string()),
545            Some("prev_cursor".to_string()),
546            true,
547        );
548
549        assert_eq!(result.items.len(), 3);
550        assert!(result.next_cursor.is_some());
551        assert!(result.previous_cursor.is_some());
552        assert!(result.has_more);
553    }
554
555    #[test]
556    fn test_paginated_result_last_page() {
557        let result: PaginatedResult<i32> = PaginatedResult::new(vec![1, 2], None, None, false);
558
559        assert_eq!(result.items.len(), 2);
560        assert!(result.next_cursor.is_none());
561        assert!(result.previous_cursor.is_none());
562        assert!(!result.has_more);
563    }
564}