Skip to main content

shiplog_ingest_linear/
lib.rs

1//! Linear API ingestor with cache support.
2//!
3//! Collects issue events, tracks coverage slices, and marks partial
4//! completeness when search caps or incomplete API responses are detected.
5
6use anyhow::{Context, Result, anyhow};
7use chrono::{DateTime, NaiveDate, Utc};
8use reqwest::blocking::Client;
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use shiplog_cache::ApiCache;
12use shiplog_ids::{EventId, RunId};
13use shiplog_ports::{IngestOutput, Ingestor};
14use shiplog_schema::coverage::{Completeness, CoverageManifest, CoverageSlice, TimeWindow};
15use shiplog_schema::event::{
16    Actor, EventEnvelope, EventKind, EventPayload, Link, ManualEvent, ManualEventType, RepoRef,
17    RepoVisibility, SourceRef, SourceSystem,
18};
19use std::path::PathBuf;
20use std::thread::sleep;
21use std::time::Duration;
22
23/// Linear issue status filter
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum IssueStatus {
26    Backlog,
27    Todo,
28    InProgress,
29    Done,
30    Cancelled,
31    All,
32}
33
34impl IssueStatus {
35    pub fn as_str(&self) -> &str {
36        match self {
37            Self::Backlog => "backlog",
38            Self::Todo => "todo",
39            Self::InProgress => "in_progress",
40            Self::Done => "done",
41            Self::Cancelled => "cancelled",
42            Self::All => "all",
43        }
44    }
45
46    fn linear_state_type(&self) -> Option<&'static str> {
47        match self {
48            Self::Backlog => Some("backlog"),
49            Self::Todo => Some("unstarted"),
50            Self::InProgress => Some("started"),
51            Self::Done => Some("completed"),
52            Self::Cancelled => Some("canceled"),
53            Self::All => None,
54        }
55    }
56}
57
58impl std::str::FromStr for IssueStatus {
59    type Err = anyhow::Error;
60
61    fn from_str(s: &str) -> Result<Self> {
62        match s.to_lowercase().as_str() {
63            "backlog" => Ok(Self::Backlog),
64            "todo" => Ok(Self::Todo),
65            "in_progress" | "in progress" => Ok(Self::InProgress),
66            "done" | "completed" | "closed" => Ok(Self::Done),
67            "cancelled" | "canceled" => Ok(Self::Cancelled),
68            "all" => Ok(Self::All),
69            _ => Err(anyhow!("Invalid issue status: {}", s)),
70        }
71    }
72}
73
74#[derive(Debug)]
75pub struct LinearIngestor {
76    pub user: String,
77    pub since: NaiveDate,
78    pub until: NaiveDate,
79    pub status: IssueStatus,
80    pub throttle_ms: u64,
81    pub api_key: Option<String>,
82    /// Optional project filter
83    pub project: Option<String>,
84    /// Optional cache for API responses
85    pub cache: Option<ApiCache>,
86}
87
88impl LinearIngestor {
89    pub fn new(user: String, since: NaiveDate, until: NaiveDate) -> Self {
90        Self {
91            user,
92            since,
93            until,
94            status: IssueStatus::Done,
95            throttle_ms: 0,
96            api_key: None,
97            project: None,
98            cache: None,
99        }
100    }
101
102    /// Set the Linear API key.
103    pub fn with_api_key(mut self, api_key: String) -> Result<Self> {
104        if api_key.is_empty() {
105            return Err(anyhow!("Linear API key cannot be empty"));
106        }
107        self.api_key = Some(api_key);
108        Ok(self)
109    }
110
111    /// Set the project filter.
112    pub fn with_project(mut self, project: String) -> Self {
113        self.project = Some(project);
114        self
115    }
116
117    /// Set the issue status filter.
118    pub fn with_status(mut self, status: IssueStatus) -> Self {
119        self.status = status;
120        self
121    }
122
123    /// Enable caching with the given cache directory.
124    pub fn with_cache(mut self, cache_dir: impl Into<PathBuf>) -> Result<Self> {
125        let cache_path = cache_dir.into().join("linear-api-cache.db");
126        if let Some(parent) = cache_path.parent() {
127            std::fs::create_dir_all(parent)
128                .with_context(|| format!("create Linear cache directory {parent:?}"))?;
129        }
130        let cache = ApiCache::open(cache_path)?;
131        self.cache = Some(cache);
132        Ok(self)
133    }
134
135    /// Enable in-memory caching (useful for testing).
136    pub fn with_in_memory_cache(mut self) -> Result<Self> {
137        let cache = ApiCache::open_in_memory()?;
138        self.cache = Some(cache);
139        Ok(self)
140    }
141
142    /// Set throttle delay between API requests (in milliseconds).
143    pub fn with_throttle(mut self, ms: u64) -> Self {
144        self.throttle_ms = ms;
145        self
146    }
147
148    fn html_base_url(&self) -> String {
149        "https://linear.app".to_string()
150    }
151
152    fn api_base_url(&self) -> String {
153        "https://api.linear.app/graphql".to_string()
154    }
155
156    #[mutants::skip]
157    fn client(&self) -> Result<Client> {
158        Client::builder()
159            .user_agent(concat!("shiplog/", env!("CARGO_PKG_VERSION")))
160            .build()
161            .context("build reqwest client")
162    }
163
164    #[mutants::skip]
165    fn throttle(&self) {
166        if self.throttle_ms > 0 {
167            sleep(Duration::from_millis(self.throttle_ms));
168        }
169    }
170
171    /// Execute a GraphQL query
172    #[mutants::skip]
173    fn execute_query<T: DeserializeOwned>(
174        &self,
175        client: &Client,
176        query: &str,
177        variables: &serde_json::Value,
178    ) -> Result<T> {
179        let mut req = client
180            .post(self.api_base_url())
181            .header("Accept", "application/json")
182            .header("Content-Type", "application/json")
183            .json(&serde_json::json!({
184                "query": query,
185                "variables": variables,
186            }));
187
188        // Linear personal API keys are sent as the raw Authorization header value.
189        if let Some(key) = &self.api_key {
190            req = req.header("Authorization", key);
191        }
192
193        let resp = req.send().context("execute Linear GraphQL query")?;
194        self.throttle();
195
196        let status = resp.status();
197        if !status.is_success() {
198            let body = resp.text().unwrap_or_default();
199
200            // Handle specific Linear error cases
201            if status.as_u16() == 401 {
202                return Err(anyhow!(
203                    "Linear authentication failed: invalid or expired API key"
204                ));
205            } else if status.as_u16() == 403 {
206                if body.to_lowercase().contains("rate limit") {
207                    return Err(anyhow!("Linear API rate limit exceeded"));
208                }
209                return Err(anyhow!("Linear API access forbidden: {}", body));
210            } else if status.as_u16() == 404 {
211                return Err(anyhow!("Linear resource not found: {}", body));
212            }
213
214            return Err(anyhow!("Linear API error {status}: {body}"));
215        }
216
217        let response: LinearResponse<T> = resp.json().context("parse Linear GraphQL response")?;
218
219        if let Some(errors) = response.errors {
220            return Err(anyhow!(
221                "Linear GraphQL errors: {}",
222                errors
223                    .iter()
224                    .map(|e| e.message.as_deref().unwrap_or("unknown error"))
225                    .collect::<Vec<_>>()
226                    .join(", ")
227            ));
228        }
229
230        response
231            .data
232            .ok_or_else(|| anyhow!("Linear response missing data"))
233    }
234
235    /// Query Linear issues
236    #[mutants::skip]
237    fn query_issues(
238        &self,
239        client: &Client,
240    ) -> Result<(Vec<LinearIssue>, Vec<CoverageSlice>, bool)> {
241        let mut slices = Vec::new();
242        let mut partial = false;
243        let filter = self.issue_filter();
244
245        // Build GraphQL query
246        let query = r#"
247            query Issues($first: Int!, $after: String, $filter: IssueFilter) {
248                issues(first: $first, after: $after, filter: $filter) {
249                    nodes {
250                        id
251                        identifier
252                        title
253                        description
254                        state {
255                            id
256                            name
257                            type
258                        }
259                        project {
260                            id
261                            name
262                            key
263                        }
264                        createdAt
265                        completedAt
266                        canceledAt
267                        assignee {
268                            id
269                            name
270                            displayName
271                        }
272                    }
273                    pageInfo {
274                        hasNextPage
275                        endCursor
276                    }
277                }
278            }
279        "#;
280
281        let mut issues = Vec::new();
282        let mut after: Option<String> = None;
283        let mut total_count = 0u64;
284
285        loop {
286            let mut variables = serde_json::json!({
287                "first": 100,
288                "filter": filter,
289            });
290            if let Some(cursor) = &after {
291                variables["after"] = serde_json::json!(cursor);
292            }
293
294            let response: LinearData<LinearIssuesResponse> =
295                self.execute_query(client, query, &variables)?;
296
297            if let Some(issue_connection) = response.data.and_then(|u| u.issues) {
298                if let Some(nodes) = issue_connection.nodes {
299                    let fetched_count = nodes.len() as u64;
300                    total_count += fetched_count;
301                    issues.extend(nodes);
302
303                    // Check for partial results
304                    if issue_connection.page_info.has_next_page {
305                        partial = true;
306                        after = issue_connection.page_info.end_cursor;
307                    } else {
308                        break;
309                    }
310                } else {
311                    break;
312                }
313            } else {
314                break;
315            }
316        }
317
318        // Create coverage slice
319        let query_str = self.coverage_query();
320
321        slices.push(CoverageSlice {
322            window: TimeWindow {
323                since: self.since,
324                until: self.until,
325            },
326            query: query_str,
327            total_count,
328            fetched: issues.len() as u64,
329            incomplete_results: Some(partial),
330            notes: vec!["search:linear".to_string()],
331        });
332
333        Ok((issues, slices, partial))
334    }
335
336    fn issue_filter(&self) -> serde_json::Value {
337        let mut filter = serde_json::json!({
338            "assignee": {
339                "id": {
340                    "eq": self.user,
341                },
342            },
343            "createdAt": {
344                "gte": self.since.format("%Y-%m-%d").to_string(),
345                "lt": self.until.format("%Y-%m-%d").to_string(),
346            },
347        });
348
349        if let Some(state_type) = self.status.linear_state_type() {
350            filter["state"] = serde_json::json!({
351                "type": {
352                    "eq": state_type,
353                },
354            });
355        }
356
357        if let Some(project) = &self.project {
358            filter["project"] = serde_json::json!({
359                "key": {
360                    "eq": project,
361                },
362            });
363        }
364
365        filter
366    }
367
368    fn coverage_query(&self) -> String {
369        let mut parts = vec![
370            format!("assignee.id = '{}'", self.user),
371            format!("createdAt >= '{}'", self.since.format("%Y-%m-%d")),
372            format!("createdAt < '{}'", self.until.format("%Y-%m-%d")),
373        ];
374
375        if let Some(state_type) = self.status.linear_state_type() {
376            parts.push(format!("state.type = '{state_type}'"));
377        }
378
379        if let Some(project) = &self.project {
380            parts.push(format!("project.key = '{project}'"));
381        }
382
383        parts.join(" AND ")
384    }
385
386    /// Convert Linear issues to shiplog events
387    #[mutants::skip]
388    fn issues_to_events(&self, issues: Vec<LinearIssue>) -> Result<Vec<EventEnvelope>> {
389        let mut events = Vec::new();
390        let html_base = self.html_base_url();
391
392        for issue in issues {
393            let issue_url = format!("{}/issue/{}", html_base, issue.identifier);
394
395            // Determine the event timestamp
396            let occurred_at = issue
397                .completed_at
398                .or(issue.canceled_at)
399                .unwrap_or(issue.created_at);
400
401            // Determine the event type based on state
402            let event_type = ManualEventType::Other;
403
404            let event = EventEnvelope {
405                id: EventId::from_parts(["linear", "issue", &issue.id]),
406                kind: EventKind::Manual,
407                occurred_at,
408                actor: Actor {
409                    login: issue
410                        .assignee
411                        .as_ref()
412                        .map(|a| a.name.clone())
413                        .unwrap_or_else(|| self.user.clone()),
414                    id: None, // Linear uses string-based IDs, not u64
415                },
416                repo: RepoRef {
417                    full_name: issue
418                        .project
419                        .as_ref()
420                        .map(|p| format!("linear/{}", p.key))
421                        .unwrap_or_else(|| "linear/unknown".to_string()),
422                    html_url: Some(html_base.clone()),
423                    visibility: RepoVisibility::Private,
424                },
425                payload: EventPayload::Manual(ManualEvent {
426                    event_type,
427                    title: issue.title.clone(),
428                    description: issue.description,
429                    started_at: Some(issue.created_at.date_naive()),
430                    ended_at: issue
431                        .completed_at
432                        .or(issue.canceled_at)
433                        .map(|d| d.date_naive()),
434                    impact: Some(format!("Issue: {}", issue.identifier)),
435                }),
436                tags: vec![],
437                links: vec![Link {
438                    label: "Linear Issue".to_string(),
439                    url: issue_url.clone(),
440                }],
441                source: SourceRef {
442                    system: SourceSystem::Other("linear".to_string()),
443                    url: Some(issue_url),
444                    opaque_id: Some(issue.id),
445                },
446            };
447
448            events.push(event);
449        }
450
451        Ok(events)
452    }
453}
454
455impl Ingestor for LinearIngestor {
456    #[mutants::skip]
457    fn ingest(&self) -> Result<IngestOutput> {
458        if self.since >= self.until {
459            return Err(anyhow!("since must be < until"));
460        }
461
462        let _api_key = self.api_key.as_ref().ok_or_else(|| {
463            anyhow!("Linear API key is required. Set it using with_api_key() or LINEAR_API_KEY environment variable")
464        })?;
465
466        let client = self.client()?;
467        let run_id = RunId::now("shiplog");
468        let mut slices: Vec<CoverageSlice> = Vec::new();
469        let warnings: Vec<String> = Vec::new();
470        let mut completeness = Completeness::Complete;
471
472        let mut events: Vec<EventEnvelope> = Vec::new();
473
474        // Query issues
475        let (issues, query_slices, query_partial) = self.query_issues(&client)?;
476        slices.extend(query_slices);
477        if query_partial {
478            completeness = Completeness::Partial;
479        }
480
481        // Convert issues to events
482        events.extend(self.issues_to_events(issues)?);
483
484        // Sort for stable output
485        events.sort_by_key(|e| e.occurred_at);
486
487        let cov = CoverageManifest {
488            run_id,
489            generated_at: Utc::now(),
490            user: self.user.clone(),
491            window: TimeWindow {
492                since: self.since,
493                until: self.until,
494            },
495            mode: self.status.as_str().to_string(),
496            sources: vec!["linear".to_string()],
497            slices,
498            warnings,
499            completeness,
500        };
501
502        Ok(IngestOutput {
503            events,
504            coverage: cov,
505            freshness: Vec::new(),
506        })
507    }
508}
509
510// Linear API types
511
512#[derive(Debug, Deserialize)]
513struct LinearResponse<T> {
514    data: Option<T>,
515    errors: Option<Vec<LinearError>>,
516}
517
518#[derive(Debug, Deserialize)]
519#[allow(dead_code)]
520struct LinearError {
521    message: Option<String>,
522    #[serde(rename = "type")]
523    type_: Option<String>,
524}
525
526#[derive(Debug, Deserialize)]
527struct LinearData<T> {
528    data: Option<T>,
529}
530
531#[derive(Debug, Deserialize)]
532struct LinearIssuesResponse {
533    issues: Option<LinearIssuesConnection>,
534}
535
536#[derive(Debug, Deserialize)]
537struct LinearIssuesConnection {
538    nodes: Option<Vec<LinearIssue>>,
539    #[serde(rename = "pageInfo")]
540    page_info: LinearPageInfo,
541}
542
543#[derive(Debug, Deserialize)]
544struct LinearPageInfo {
545    #[serde(rename = "hasNextPage")]
546    has_next_page: bool,
547    #[serde(rename = "endCursor")]
548    end_cursor: Option<String>,
549}
550
551#[derive(Debug, Deserialize)]
552struct LinearIssue {
553    id: String,
554    identifier: String,
555    title: String,
556    description: Option<String>,
557    #[allow(dead_code)]
558    state: Option<LinearState>,
559    project: Option<LinearProject>,
560    #[serde(rename = "createdAt")]
561    created_at: DateTime<Utc>,
562    #[serde(rename = "completedAt")]
563    completed_at: Option<DateTime<Utc>>,
564    #[serde(rename = "canceledAt")]
565    canceled_at: Option<DateTime<Utc>>,
566    assignee: Option<LinearUserAccount>,
567}
568
569#[derive(Debug, Deserialize)]
570#[allow(dead_code)]
571struct LinearState {
572    id: String,
573    name: String,
574    #[serde(rename = "type")]
575    type_: String,
576}
577
578#[derive(Debug, Deserialize)]
579#[allow(dead_code)]
580struct LinearProject {
581    id: String,
582    name: String,
583    key: String,
584}
585
586#[derive(Debug, Deserialize)]
587#[allow(dead_code)]
588struct LinearUserAccount {
589    id: String,
590    name: String,
591    #[serde(rename = "displayName")]
592    display_name: String,
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598
599    #[test]
600    fn with_cache_creates_missing_directory() {
601        let temp = tempfile::tempdir().unwrap();
602        let cache_dir = temp.path().join("nested").join("cache");
603
604        let ing = LinearIngestor::new(
605            "alice".to_string(),
606            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
607            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
608        )
609        .with_cache(&cache_dir)
610        .unwrap();
611
612        assert!(ing.cache.is_some());
613        assert!(cache_dir.join("linear-api-cache.db").exists());
614    }
615
616    #[test]
617    fn with_in_memory_cache_works() {
618        let ing = LinearIngestor::new(
619            "alice".to_string(),
620            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
621            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
622        )
623        .with_in_memory_cache()
624        .unwrap();
625
626        assert!(ing.cache.is_some());
627    }
628
629    #[test]
630    fn with_api_key_validates_non_empty() {
631        let result = LinearIngestor::new(
632            "alice".to_string(),
633            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
634            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
635        )
636        .with_api_key("".to_string());
637
638        assert!(result.is_err());
639        assert!(result.unwrap_err().to_string().contains("cannot be empty"));
640    }
641
642    #[test]
643    fn issue_status_from_str() {
644        assert_eq!(
645            "backlog".parse::<IssueStatus>().unwrap(),
646            IssueStatus::Backlog
647        );
648        assert_eq!("todo".parse::<IssueStatus>().unwrap(), IssueStatus::Todo);
649        assert_eq!(
650            "in_progress".parse::<IssueStatus>().unwrap(),
651            IssueStatus::InProgress
652        );
653        assert_eq!("done".parse::<IssueStatus>().unwrap(), IssueStatus::Done);
654        assert_eq!(
655            "cancelled".parse::<IssueStatus>().unwrap(),
656            IssueStatus::Cancelled
657        );
658        assert_eq!("all".parse::<IssueStatus>().unwrap(), IssueStatus::All);
659        assert!("invalid".parse::<IssueStatus>().is_err());
660    }
661
662    #[test]
663    fn issue_status_as_str() {
664        assert_eq!(IssueStatus::Backlog.as_str(), "backlog");
665        assert_eq!(IssueStatus::Todo.as_str(), "todo");
666        assert_eq!(IssueStatus::InProgress.as_str(), "in_progress");
667        assert_eq!(IssueStatus::Done.as_str(), "done");
668        assert_eq!(IssueStatus::Cancelled.as_str(), "cancelled");
669        assert_eq!(IssueStatus::All.as_str(), "all");
670    }
671
672    #[test]
673    fn issue_filter_enforces_date_status_and_project_upstream() {
674        let ing = LinearIngestor::new(
675            "user-uuid".to_string(),
676            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
677            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
678        )
679        .with_status(IssueStatus::InProgress)
680        .with_project("INFRA".to_string());
681
682        let filter = ing.issue_filter();
683        assert_eq!(filter["assignee"]["id"]["eq"], "user-uuid");
684        assert_eq!(filter["createdAt"]["gte"], "2025-01-01");
685        assert_eq!(filter["createdAt"]["lt"], "2025-02-01");
686        assert_eq!(filter["state"]["type"]["eq"], "started");
687        assert_eq!(filter["project"]["key"]["eq"], "INFRA");
688    }
689
690    #[test]
691    fn issue_filter_omits_status_when_all_is_requested() {
692        let ing = LinearIngestor::new(
693            "user-uuid".to_string(),
694            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
695            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
696        )
697        .with_status(IssueStatus::All);
698
699        let filter = ing.issue_filter();
700        assert!(filter.get("state").is_none());
701        assert!(filter.get("project").is_none());
702    }
703
704    #[test]
705    fn coverage_query_records_resolved_filter() {
706        let ing = LinearIngestor::new(
707            "user-uuid".to_string(),
708            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
709            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
710        )
711        .with_status(IssueStatus::Done)
712        .with_project("OPS".to_string());
713
714        let query = ing.coverage_query();
715        assert!(query.contains("assignee.id = 'user-uuid'"));
716        assert!(query.contains("createdAt >= '2025-01-01'"));
717        assert!(query.contains("createdAt < '2025-02-01'"));
718        assert!(query.contains("state.type = 'completed'"));
719        assert!(query.contains("project.key = 'OPS'"));
720    }
721
722    #[test]
723    fn html_base_url_constructs_correctly() {
724        let ing = LinearIngestor::new(
725            "alice".to_string(),
726            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
727            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
728        );
729        assert_eq!(ing.html_base_url(), "https://linear.app");
730    }
731
732    #[test]
733    fn api_base_url_constructs_correctly() {
734        let ing = LinearIngestor::new(
735            "alice".to_string(),
736            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
737            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
738        );
739        assert_eq!(ing.api_base_url(), "https://api.linear.app/graphql");
740    }
741
742    #[test]
743    fn recorded_linear_graphql_payload_deserializes_and_converts() {
744        let payload = serde_json::json!({
745            "data": {
746                "data": {
747                    "issues": {
748                        "nodes": [
749                            {
750                                "id": "issue-uuid-001",
751                                "identifier": "ENG-123",
752                                "title": "Implement API rate limiting",
753                                "description": "Add rate limiting middleware",
754                                "state": {
755                                    "id": "state-1",
756                                    "name": "Done",
757                                    "type": "completed"
758                                },
759                                "project": {
760                                    "id": "proj-1",
761                                    "name": "Backend Infrastructure",
762                                    "key": "INFRA"
763                                },
764                                "createdAt": "2025-01-10T09:00:00Z",
765                                "completedAt": "2025-01-18T16:00:00Z",
766                                "canceledAt": null,
767                                "assignee": {
768                                    "id": "user-1",
769                                    "name": "alice",
770                                    "displayName": "Alice Smith"
771                                }
772                            }
773                        ],
774                        "pageInfo": {
775                            "hasNextPage": false,
776                            "endCursor": null
777                        }
778                    }
779                }
780            }
781        });
782
783        let response: LinearResponse<LinearData<LinearIssuesResponse>> =
784            serde_json::from_value(payload).unwrap();
785        let connection = response.data.unwrap().data.unwrap().issues.unwrap();
786        assert!(!connection.page_info.has_next_page);
787        assert!(connection.page_info.end_cursor.is_none());
788        let issues = connection.nodes.unwrap();
789        assert_eq!(issues[0].state.as_ref().unwrap().type_, "completed");
790        assert_eq!(issues[0].project.as_ref().unwrap().key, "INFRA");
791
792        let ing = LinearIngestor::new(
793            "user-1".to_string(),
794            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
795            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
796        );
797        let events = ing.issues_to_events(issues).unwrap();
798        assert_eq!(events.len(), 1);
799        let event = &events[0];
800        assert_eq!(event.actor.login, "alice");
801        assert_eq!(event.repo.full_name, "linear/INFRA");
802        assert_eq!(
803            event.source.system,
804            SourceSystem::Other("linear".to_string())
805        );
806        assert_eq!(
807            event.source.url.as_deref(),
808            Some("https://linear.app/issue/ENG-123")
809        );
810        if let EventPayload::Manual(manual) = &event.payload {
811            assert_eq!(manual.title, "Implement API rate limiting");
812            assert_eq!(
813                manual.ended_at,
814                Some(NaiveDate::from_ymd_opt(2025, 1, 18).unwrap())
815            );
816        } else {
817            panic!("Expected Manual payload");
818        }
819    }
820
821    // --- Snapshot tests ---
822
823    #[test]
824    fn snapshot_linear_issue_to_event() {
825        let ing = LinearIngestor::new(
826            "alice".to_string(),
827            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
828            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
829        );
830
831        let created = NaiveDate::from_ymd_opt(2025, 1, 10)
832            .unwrap()
833            .and_hms_opt(9, 0, 0)
834            .unwrap()
835            .and_utc();
836        let completed = NaiveDate::from_ymd_opt(2025, 1, 18)
837            .unwrap()
838            .and_hms_opt(16, 0, 0)
839            .unwrap()
840            .and_utc();
841
842        let issues = vec![LinearIssue {
843            id: "issue-uuid-001".to_string(),
844            identifier: "ENG-123".to_string(),
845            title: "Implement API rate limiting".to_string(),
846            description: Some("Add rate limiting middleware to all public endpoints".to_string()),
847            state: Some(LinearState {
848                id: "state-1".to_string(),
849                name: "Done".to_string(),
850                type_: "completed".to_string(),
851            }),
852            project: Some(LinearProject {
853                id: "proj-1".to_string(),
854                name: "Backend Infrastructure".to_string(),
855                key: "INFRA".to_string(),
856            }),
857            created_at: created,
858            completed_at: Some(completed),
859            canceled_at: None,
860            assignee: Some(LinearUserAccount {
861                id: "user-1".to_string(),
862                name: "alice".to_string(),
863                display_name: "Alice Smith".to_string(),
864            }),
865        }];
866
867        let events = ing.issues_to_events(issues).unwrap();
868        insta::assert_yaml_snapshot!(events);
869    }
870
871    #[test]
872    fn snapshot_linear_issue_to_event_minimal() {
873        let ing = LinearIngestor::new(
874            "bob".to_string(),
875            NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
876            NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
877        );
878
879        let created = NaiveDate::from_ymd_opt(2025, 1, 5)
880            .unwrap()
881            .and_hms_opt(11, 0, 0)
882            .unwrap()
883            .and_utc();
884
885        let issues = vec![LinearIssue {
886            id: "issue-uuid-002".to_string(),
887            identifier: "FE-45".to_string(),
888            title: "Fix button alignment".to_string(),
889            description: None,
890            state: None,
891            project: None,
892            created_at: created,
893            completed_at: None,
894            canceled_at: None,
895            assignee: None,
896        }];
897
898        let events = ing.issues_to_events(issues).unwrap();
899        insta::assert_yaml_snapshot!(events);
900    }
901}