Skip to main content

harn_vm/connectors/linear/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use async_trait::async_trait;
6use reqwest::StatusCode;
7use serde::Deserialize;
8use serde_json::{json, Map as JsonMap, Value as JsonValue};
9use time::{Duration, OffsetDateTime};
10use tokio::sync::watch;
11use tokio::task::JoinHandle;
12
13use crate::connectors::{
14    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15    HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::secrets::{SecretId, SecretVersion};
18use crate::triggers::{
19    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
20    TriggerEvent, TriggerEventId,
21};
22
23#[cfg(test)]
24mod tests;
25
26pub const LINEAR_PROVIDER_ID: &str = "linear";
27const DEFAULT_API_BASE_URL: &str = "https://api.linear.app/graphql";
28const DEFAULT_REPLAY_WINDOW_SECS: i64 = 60;
29const DEFAULT_REPLAY_GRACE_SECS: i64 = 15;
30const DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS: u64 = 60;
31const DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD: u32 = 5;
32const COMPLEXITY_WARNING_THRESHOLD: i64 = 5_000;
33
34pub struct LinearConnector {
35    provider_id: ProviderId,
36    kinds: Vec<TriggerKind>,
37    state: Arc<LinearConnectorState>,
38    client: Arc<LinearClient>,
39}
40
41#[derive(Default)]
42struct LinearConnectorState {
43    ctx: RwLock<Option<ConnectorCtx>>,
44    bindings: RwLock<HashMap<String, ActivatedLinearBinding>>,
45    monitor_tasks: Mutex<Vec<JoinHandle<()>>>,
46    monitor_shutdown: Mutex<Option<watch::Sender<bool>>>,
47}
48
49#[derive(Clone, Debug)]
50struct ActivatedLinearBinding {
51    #[allow(dead_code)]
52    binding_id: String,
53    path: Option<String>,
54    signing_secret: SecretId,
55    replay_grace_secs: i64,
56    monitor: Option<LinearWebhookMonitor>,
57}
58
59struct LinearClient {
60    provider_id: ProviderId,
61    state: Arc<LinearConnectorState>,
62    http: reqwest::Client,
63}
64
65#[derive(Debug, Default, Deserialize)]
66struct LinearBindingConfig {
67    #[serde(default, rename = "match")]
68    match_config: LinearMatchConfig,
69    #[serde(default)]
70    secrets: LinearSecretsConfig,
71    #[serde(default)]
72    security: LinearSecurityConfig,
73    #[serde(default)]
74    replay_grace_secs: Option<i64>,
75    #[serde(default)]
76    monitor: Option<LinearWebhookMonitorConfig>,
77}
78
79#[derive(Debug, Default, Deserialize)]
80struct LinearMatchConfig {
81    path: Option<String>,
82}
83
84#[derive(Clone, Debug, Default, Deserialize)]
85struct LinearSecretsConfig {
86    signing_secret: Option<String>,
87    access_token: Option<String>,
88    api_key: Option<String>,
89}
90
91#[derive(Debug, Default, Deserialize)]
92struct LinearSecurityConfig {
93    replay_grace_secs: Option<i64>,
94    timestamp_grace_secs: Option<i64>,
95}
96
97#[derive(Clone, Debug, Default, Deserialize)]
98struct LinearClientConfigArgs {
99    api_base_url: Option<String>,
100    api_key: Option<String>,
101    api_key_secret: Option<String>,
102    access_token: Option<String>,
103    access_token_secret: Option<String>,
104    #[serde(default)]
105    secrets: LinearSecretsConfig,
106}
107
108#[derive(Clone, Debug, Default, Deserialize)]
109struct LinearWebhookMonitorConfig {
110    #[serde(default)]
111    enabled: Option<bool>,
112    webhook_id: Option<String>,
113    health_url: Option<String>,
114    #[serde(default)]
115    probe_interval_secs: Option<u64>,
116    #[serde(default)]
117    probe_interval_ms: Option<u64>,
118    #[serde(default)]
119    success_threshold: Option<u32>,
120    #[serde(flatten)]
121    client: LinearClientConfigArgs,
122}
123
124#[derive(Clone, Debug)]
125struct ResolvedLinearClientConfig {
126    api_base_url: String,
127    auth: LinearAuthSource,
128}
129
130#[derive(Clone, Debug)]
131enum LinearAuthSource {
132    ApiKeyInline(String),
133    ApiKeySecret(SecretId),
134    AccessTokenInline(String),
135    AccessTokenSecret(SecretId),
136}
137
138#[derive(Clone, Debug)]
139struct LinearWebhookMonitor {
140    webhook_id: String,
141    health_url: String,
142    probe_interval: StdDuration,
143    success_threshold: u32,
144    client_config: ResolvedLinearClientConfig,
145}
146
147#[derive(Debug, Deserialize)]
148struct ListIssuesArgs {
149    #[serde(flatten)]
150    config: LinearClientConfigArgs,
151    #[serde(default)]
152    filter: Option<JsonValue>,
153    #[serde(default)]
154    first: Option<i64>,
155    #[serde(default)]
156    after: Option<String>,
157    #[serde(default)]
158    include_archived: Option<bool>,
159}
160
161#[derive(Debug, Deserialize)]
162struct UpdateIssueArgs {
163    #[serde(flatten)]
164    config: LinearClientConfigArgs,
165    id: String,
166    changes: JsonValue,
167}
168
169#[derive(Debug, Deserialize)]
170struct CreateCommentArgs {
171    #[serde(flatten)]
172    config: LinearClientConfigArgs,
173    issue_id: String,
174    body: String,
175}
176
177#[derive(Debug, Deserialize)]
178struct SearchArgs {
179    #[serde(flatten)]
180    config: LinearClientConfigArgs,
181    query: String,
182    #[serde(default)]
183    first: Option<i64>,
184}
185
186#[derive(Debug, Deserialize)]
187struct GraphqlArgs {
188    #[serde(flatten)]
189    config: LinearClientConfigArgs,
190    query: String,
191    #[serde(default)]
192    variables: Option<JsonValue>,
193    #[serde(default)]
194    operation_name: Option<String>,
195}
196
197impl LinearConnector {
198    pub fn new() -> Self {
199        let state = Arc::new(LinearConnectorState::default());
200        let client = Arc::new(LinearClient {
201            provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
202            state: state.clone(),
203            http: crate::connectors::outbound_http_client("harn-linear-connector"),
204        });
205        Self {
206            provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
207            kinds: vec![TriggerKind::from("webhook")],
208            state,
209            client,
210        }
211    }
212
213    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedLinearBinding, ConnectorError> {
214        let bindings = self
215            .state
216            .bindings
217            .read()
218            .expect("linear connector bindings poisoned");
219        if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
220            return bindings.get(binding_id).cloned().ok_or_else(|| {
221                ConnectorError::Unsupported(format!(
222                    "linear connector has no active binding `{binding_id}`"
223                ))
224            });
225        }
226        if bindings.len() == 1 {
227            return bindings
228                .values()
229                .next()
230                .cloned()
231                .ok_or_else(|| ConnectorError::Activation("linear bindings missing".to_string()));
232        }
233        Err(ConnectorError::Unsupported(
234            "linear connector requires raw.metadata.binding_id when multiple bindings are active"
235                .to_string(),
236        ))
237    }
238
239    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
240        self.state
241            .ctx
242            .read()
243            .expect("linear connector ctx poisoned")
244            .clone()
245            .ok_or_else(|| {
246                ConnectorError::Activation(
247                    "linear connector must be initialized before use".to_string(),
248                )
249            })
250    }
251
252    fn stop_monitors(&self) {
253        if let Some(shutdown) = self
254            .state
255            .monitor_shutdown
256            .lock()
257            .expect("linear connector monitor shutdown poisoned")
258            .take()
259        {
260            let _ = shutdown.send(true);
261        }
262        let mut tasks = self
263            .state
264            .monitor_tasks
265            .lock()
266            .expect("linear connector monitor tasks poisoned");
267        for task in tasks.drain(..) {
268            task.abort();
269        }
270    }
271}
272
273impl Default for LinearConnector {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279impl Drop for LinearConnector {
280    fn drop(&mut self) {
281        self.stop_monitors();
282    }
283}
284
285#[async_trait]
286impl Connector for LinearConnector {
287    fn provider_id(&self) -> &ProviderId {
288        &self.provider_id
289    }
290
291    fn kinds(&self) -> &[TriggerKind] {
292        &self.kinds
293    }
294
295    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
296        *self
297            .state
298            .ctx
299            .write()
300            .expect("linear connector ctx poisoned") = Some(ctx);
301        Ok(())
302    }
303
304    async fn activate(
305        &self,
306        bindings: &[TriggerBinding],
307    ) -> Result<ActivationHandle, ConnectorError> {
308        self.stop_monitors();
309        let mut configured = HashMap::new();
310        let mut paths = BTreeSet::new();
311        for binding in bindings {
312            let activated = ActivatedLinearBinding::from_binding(binding)?;
313            if let Some(path) = &activated.path {
314                if !paths.insert(path.clone()) {
315                    return Err(ConnectorError::Activation(format!(
316                        "linear connector path `{path}` is configured by multiple bindings"
317                    )));
318                }
319            }
320            configured.insert(binding.binding_id.clone(), activated);
321        }
322        let (monitor_shutdown, _) = watch::channel(false);
323        *self
324            .state
325            .bindings
326            .write()
327            .expect("linear connector bindings poisoned") = configured;
328        *self
329            .state
330            .monitor_shutdown
331            .lock()
332            .expect("linear connector monitor shutdown poisoned") = Some(monitor_shutdown.clone());
333        let tasks = {
334            let bindings = self
335                .state
336                .bindings
337                .read()
338                .expect("linear connector bindings poisoned");
339            bindings
340                .values()
341                .filter_map(|binding| {
342                    binding.monitor.clone().map(|monitor| {
343                        let client = self.client.clone();
344                        let shutdown = monitor_shutdown.subscribe();
345                        tokio::spawn(async move {
346                            run_webhook_monitor(client, monitor, shutdown).await;
347                        })
348                    })
349                })
350                .collect::<Vec<_>>()
351        };
352        *self
353            .state
354            .monitor_tasks
355            .lock()
356            .expect("linear connector monitor tasks poisoned") = tasks;
357        Ok(ActivationHandle::new(
358            self.provider_id.clone(),
359            bindings.len(),
360        ))
361    }
362
363    async fn shutdown(&self, deadline: StdDuration) -> Result<(), ConnectorError> {
364        if let Some(shutdown) = self
365            .state
366            .monitor_shutdown
367            .lock()
368            .expect("linear connector monitor shutdown poisoned")
369            .take()
370        {
371            let _ = shutdown.send(true);
372        }
373        let pending = self
374            .state
375            .monitor_tasks
376            .lock()
377            .expect("linear connector monitor tasks poisoned")
378            .drain(..)
379            .collect::<Vec<_>>();
380        if pending.is_empty() {
381            return Ok(());
382        }
383        let wait_all = async {
384            for task in pending {
385                let _ = task.await;
386            }
387        };
388        tokio::time::timeout(deadline, wait_all)
389            .await
390            .map_err(|_| {
391                ConnectorError::Activation(format!(
392                    "linear connector shutdown exceeded {}s",
393                    deadline.as_secs()
394                ))
395            })?;
396        Ok(())
397    }
398
399    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
400        let ctx = self.ctx()?;
401        let binding = self.binding_for_raw(&raw)?;
402        let provider = self.provider_id.clone();
403        let received_at = raw.received_at;
404        let headers = effective_headers(&raw.headers);
405        let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
406        let timestamp_window =
407            Duration::seconds(DEFAULT_REPLAY_WINDOW_SECS + binding.replay_grace_secs.max(0));
408        let verify = futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
409            ctx.event_log.as_ref(),
410            &provider,
411            HmacSignatureStyle::linear(),
412            &raw.body,
413            &headers,
414            secret.as_str(),
415            Some(timestamp_window),
416            received_at,
417        ));
418        if let Err(error) = verify {
419            if matches!(error, ConnectorError::TimestampOutOfWindow { .. }) {
420                ctx.metrics.record_linear_timestamp_rejection();
421            }
422            return Err(error);
423        }
424
425        let payload = raw.json_body()?;
426        let dedupe_key = header_value(&headers, "linear-delivery")
427            .map(ToString::to_string)
428            .or_else(|| {
429                payload
430                    .get("webhookId")
431                    .and_then(JsonValue::as_str)
432                    .map(ToString::to_string)
433            })
434            .unwrap_or_else(|| fallback_body_digest(&raw.body));
435        let kind = linear_trigger_kind(&payload);
436        let provider_payload = ProviderPayload::normalize(&provider, &kind, &headers, payload)
437            .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
438        Ok(TriggerEvent {
439            id: TriggerEventId::new(),
440            provider,
441            kind,
442            received_at,
443            occurred_at: raw
444                .occurred_at
445                .or_else(|| infer_occurred_at(&provider_payload)),
446            dedupe_key,
447            trace_id: TraceId::new(),
448            tenant_id: raw.tenant_id.clone(),
449            headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
450            batch: None,
451            raw_body: Some(raw.body.clone()),
452            provider_payload,
453            signature_status: SignatureStatus::Verified,
454            dedupe_claimed: false,
455        })
456    }
457
458    fn payload_schema(&self) -> ProviderPayloadSchema {
459        ProviderPayloadSchema::named("LinearEventPayload")
460    }
461
462    fn client(&self) -> Arc<dyn ConnectorClient> {
463        self.client.clone()
464    }
465}
466
467#[async_trait]
468impl ConnectorClient for LinearClient {
469    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
470        match method {
471            "list_issues" => {
472                let args: ListIssuesArgs = parse_args(args)?;
473                let config = self.resolve_client_config(&args.config)?;
474                let envelope = self
475                    .request_graphql(
476                        &config,
477                        "query ListIssues($filter: IssueFilter, $first: Int, $after: String, $includeArchived: Boolean) { issues(filter: $filter, first: $first, after: $after, includeArchived: $includeArchived) { nodes { id identifier title priority estimate dueDate url createdAt updatedAt state { id name type } team { id key name } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } pageInfo { hasNextPage endCursor } } }",
478                        json!({
479                            "filter": args.filter,
480                            "first": args.first,
481                            "after": args.after,
482                            "includeArchived": args.include_archived,
483                        }),
484                        Some("ListIssues"),
485                    )
486                    .await?;
487                extract_graphql_field(envelope, "issues")
488            }
489            "update_issue" => {
490                let args: UpdateIssueArgs = parse_args(args)?;
491                let config = self.resolve_client_config(&args.config)?;
492                let envelope = self
493                    .request_graphql(
494                        &config,
495                        "mutation UpdateIssue($id: String!, $input: IssueUpdateInput!) { issueUpdate(id: $id, input: $input) { success issue { id identifier title priority estimate dueDate url updatedAt state { id name type } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } } }",
496                        json!({
497                            "id": args.id,
498                            "input": args.changes,
499                        }),
500                        Some("UpdateIssue"),
501                    )
502                    .await?;
503                extract_graphql_field(envelope, "issueUpdate")
504            }
505            "create_comment" => {
506                let args: CreateCommentArgs = parse_args(args)?;
507                let config = self.resolve_client_config(&args.config)?;
508                let envelope = self
509                    .request_graphql(
510                        &config,
511                        "mutation CreateComment($input: CommentCreateInput!) { commentCreate(input: $input) { success comment { id body url createdAt user { id name } issue { id identifier title } } } }",
512                        json!({
513                            "input": {
514                                "issueId": args.issue_id,
515                                "body": args.body,
516                            }
517                        }),
518                        Some("CreateComment"),
519                    )
520                    .await?;
521                extract_graphql_field(envelope, "commentCreate")
522            }
523            "search" => {
524                let args: SearchArgs = parse_args(args)?;
525                let config = self.resolve_client_config(&args.config)?;
526                self.search_issues(&config, &args.query, args.first).await
527            }
528            "graphql" => {
529                let args: GraphqlArgs = parse_args(args)?;
530                let config = self.resolve_client_config(&args.config)?;
531                self.request_graphql(
532                    &config,
533                    &args.query,
534                    args.variables.unwrap_or(JsonValue::Null),
535                    args.operation_name.as_deref(),
536                )
537                .await
538            }
539            other => Err(ClientError::MethodNotFound(format!(
540                "linear connector does not implement outbound method `{other}`"
541            ))),
542        }
543    }
544}
545
546impl LinearClient {
547    fn resolve_client_config(
548        &self,
549        args: &LinearClientConfigArgs,
550    ) -> Result<ResolvedLinearClientConfig, ClientError> {
551        resolve_client_config_args(args).map_err(ClientError::InvalidArgs)
552    }
553
554    fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
555        self.state
556            .ctx
557            .read()
558            .expect("linear connector ctx poisoned")
559            .clone()
560            .ok_or_else(|| ClientError::Other("linear connector must be initialized".to_string()))
561    }
562
563    async fn request_graphql(
564        &self,
565        config: &ResolvedLinearClientConfig,
566        query: &str,
567        variables: JsonValue,
568        operation_name: Option<&str>,
569    ) -> Result<JsonValue, ClientError> {
570        let ctx = self.ctx()?;
571        ctx.rate_limiter
572            .scoped(&self.provider_id, "graphql")
573            .acquire()
574            .await;
575
576        let auth = self.auth_header(config).await?;
577        if let Some(error) =
578            crate::egress::client_error_for_url("connector_call:linear", &config.api_base_url)
579        {
580            return Err(error);
581        }
582        let mut request_body = JsonMap::new();
583        request_body.insert("query".to_string(), JsonValue::String(query.to_string()));
584        if !variables.is_null() {
585            request_body.insert("variables".to_string(), variables);
586        }
587        if let Some(operation_name) = operation_name {
588            request_body.insert(
589                "operationName".to_string(),
590                JsonValue::String(operation_name.to_string()),
591            );
592        }
593
594        let response = self
595            .http
596            .post(config.api_base_url.clone())
597            .header("Content-Type", "application/json")
598            .header("Authorization", auth)
599            .json(&JsonValue::Object(request_body))
600            .send()
601            .await
602            .map_err(|error| ClientError::Transport(error.to_string()))?;
603
604        let status = response.status();
605        let meta = graphql_meta(response.headers(), query);
606        let payload = response
607            .json::<JsonValue>()
608            .await
609            .map_err(|error| ClientError::Transport(error.to_string()))?;
610        let errors = payload.get("errors").cloned();
611        if is_rate_limited(status, errors.as_ref()) {
612            return Err(ClientError::RateLimited(graphql_error_message(
613                status,
614                errors.as_ref(),
615            )));
616        }
617        if !status.is_success() {
618            return Err(ClientError::Transport(graphql_error_message(
619                status,
620                errors.as_ref(),
621            )));
622        }
623
624        Ok(json!({
625            "data": payload.get("data").cloned().unwrap_or(JsonValue::Null),
626            "errors": errors.unwrap_or(JsonValue::Null),
627            "meta": meta,
628        }))
629    }
630
631    async fn auth_header(
632        &self,
633        config: &ResolvedLinearClientConfig,
634    ) -> Result<String, ClientError> {
635        match &config.auth {
636            LinearAuthSource::ApiKeyInline(value) => Ok(value.clone()),
637            LinearAuthSource::AccessTokenInline(value) => Ok(format!("Bearer {value}")),
638            LinearAuthSource::ApiKeySecret(secret_id) => {
639                let secret = self.secret_text(secret_id).await?;
640                Ok(secret)
641            }
642            LinearAuthSource::AccessTokenSecret(secret_id) => {
643                let secret = self.secret_text(secret_id).await?;
644                Ok(format!("Bearer {secret}"))
645            }
646        }
647    }
648
649    async fn secret_text(&self, secret_id: &SecretId) -> Result<String, ClientError> {
650        let ctx = self.ctx()?;
651        let secret = ctx
652            .secrets
653            .get(secret_id)
654            .await
655            .map_err(|error| ClientError::Other(error.to_string()))?;
656        Ok(secret.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string()))
657    }
658
659    async fn search_issues(
660        &self,
661        config: &ResolvedLinearClientConfig,
662        query: &str,
663        first: Option<i64>,
664    ) -> Result<JsonValue, ClientError> {
665        let searches = [
666            (
667                "query SearchIssues($query: String!, $first: Int) { searchIssues(query: $query, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
668                "SearchIssues",
669            ),
670            (
671                "query SearchIssues($term: String!, $first: Int) { searchIssues(term: $term, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
672                "SearchIssues",
673            ),
674        ];
675
676        for (index, (document, operation_name)) in searches.into_iter().enumerate() {
677            let variables = if document.contains("$query") {
678                json!({ "query": query, "first": first })
679            } else {
680                json!({ "term": query, "first": first })
681            };
682            match self
683                .request_graphql(config, document, variables, Some(operation_name))
684                .await
685            {
686                Ok(envelope) => return extract_graphql_field(envelope, "searchIssues"),
687                Err(ClientError::Transport(message))
688                    if index == 0
689                        && (message.contains("Unknown argument")
690                            || message.contains("GRAPHQL_VALIDATION_FAILED")) => {}
691                Err(error) => return Err(error),
692            }
693        }
694
695        Err(ClientError::Transport(
696            "linear connector searchIssues query failed".to_string(),
697        ))
698    }
699
700    async fn probe_health(&self, health_url: &str) -> Result<bool, ClientError> {
701        if let Some(error) =
702            crate::egress::client_error_for_url("connector_call:linear", health_url)
703        {
704            return Err(error);
705        }
706        let response = self
707            .http
708            .get(health_url)
709            .send()
710            .await
711            .map_err(|error| ClientError::Transport(error.to_string()))?;
712        Ok(response.status() == StatusCode::OK)
713    }
714
715    async fn reenable_webhook(
716        &self,
717        config: &ResolvedLinearClientConfig,
718        webhook_id: &str,
719    ) -> Result<(), ClientError> {
720        let envelope = self
721            .request_graphql(
722                config,
723                "mutation ReenableWebhook($id: String!, $input: WebhookUpdateInput!) { webhookUpdate(id: $id, input: $input) { success webhook { id enabled } } }",
724                json!({
725                    "id": webhook_id,
726                    "input": { "enabled": true },
727                }),
728                Some("ReenableWebhook"),
729            )
730            .await?;
731        let payload = envelope
732            .get("data")
733            .and_then(|value| value.get("webhookUpdate"))
734            .ok_or_else(|| {
735                ClientError::Transport(
736                    "linear connector response missing `webhookUpdate` GraphQL field".to_string(),
737                )
738            })?;
739        if payload
740            .get("success")
741            .and_then(JsonValue::as_bool)
742            .unwrap_or(false)
743        {
744            Ok(())
745        } else {
746            Err(ClientError::Transport(
747                "linear connector webhook re-enable returned success = false".to_string(),
748            ))
749        }
750    }
751}
752
753impl ActivatedLinearBinding {
754    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
755        let config: LinearBindingConfig =
756            serde_json::from_value(binding.config.clone()).map_err(|error| {
757                ConnectorError::Activation(format!(
758                    "linear binding `{}` has invalid config: {error}",
759                    binding.binding_id
760                ))
761            })?;
762        let signing_secret =
763            parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
764                ConnectorError::Activation(format!(
765                    "linear binding `{}` requires secrets.signing_secret",
766                    binding.binding_id
767                ))
768            })?;
769        let replay_grace_secs = config
770            .replay_grace_secs
771            .or(config.security.replay_grace_secs)
772            .or(config.security.timestamp_grace_secs)
773            .unwrap_or(DEFAULT_REPLAY_GRACE_SECS);
774        let monitor = config
775            .monitor
776            .map(|monitor| {
777                LinearWebhookMonitor::from_config(&binding.binding_id, monitor, &config.secrets)
778            })
779            .transpose()
780            .map_err(ConnectorError::Activation)?
781            .flatten();
782        Ok(Self {
783            binding_id: binding.binding_id.clone(),
784            path: config.match_config.path,
785            signing_secret,
786            replay_grace_secs,
787            monitor,
788        })
789    }
790}
791
792impl LinearWebhookMonitor {
793    fn from_config(
794        binding_id: &str,
795        config: LinearWebhookMonitorConfig,
796        binding_secrets: &LinearSecretsConfig,
797    ) -> Result<Option<Self>, String> {
798        if config.enabled == Some(false) {
799            return Ok(None);
800        }
801        let webhook_id = config
802            .webhook_id
803            .filter(|value| !value.trim().is_empty())
804            .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires webhook_id"))?;
805        let health_url = config
806            .health_url
807            .filter(|value| !value.trim().is_empty())
808            .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires health_url"))?;
809        let mut client = config.client;
810        if client.access_token.is_none()
811            && client.access_token_secret.is_none()
812            && client.api_key.is_none()
813            && client.api_key_secret.is_none()
814            && client.secrets.access_token.is_none()
815            && client.secrets.api_key.is_none()
816        {
817            client.secrets.access_token = binding_secrets.access_token.clone();
818            client.secrets.api_key = binding_secrets.api_key.clone();
819        }
820        let client_config = resolve_client_config_args(&client).map_err(|error| {
821            format!("linear binding `{binding_id}` monitor auth is invalid: {error}")
822        })?;
823        Ok(Some(Self {
824            webhook_id,
825            health_url,
826            probe_interval: config
827                .probe_interval_ms
828                .map(|ms| StdDuration::from_millis(ms.max(1)))
829                .unwrap_or_else(|| {
830                    StdDuration::from_secs(
831                        config
832                            .probe_interval_secs
833                            .unwrap_or(DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS)
834                            .max(1),
835                    )
836                }),
837            success_threshold: config
838                .success_threshold
839                .unwrap_or(DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD)
840                .max(1),
841            client_config,
842        }))
843    }
844}
845
846async fn run_webhook_monitor(
847    client: Arc<LinearClient>,
848    monitor: LinearWebhookMonitor,
849    mut shutdown: watch::Receiver<bool>,
850) {
851    let mut consecutive_successes = 0u32;
852    loop {
853        tokio::select! {
854            changed = shutdown.changed() => {
855                if changed.is_err() || *shutdown.borrow() {
856                    break;
857                }
858            }
859            _ = tokio::time::sleep(monitor.probe_interval) => {}
860        }
861        if *shutdown.borrow() {
862            break;
863        }
864        match client.probe_health(&monitor.health_url).await {
865            Ok(true) => {
866                consecutive_successes = consecutive_successes.saturating_add(1);
867                if consecutive_successes >= monitor.success_threshold
868                    && client
869                        .reenable_webhook(&monitor.client_config, &monitor.webhook_id)
870                        .await
871                        .is_ok()
872                {
873                    consecutive_successes = 0;
874                }
875            }
876            Ok(false) | Err(_) => {
877                consecutive_successes = 0;
878            }
879        }
880    }
881}
882
883fn resolve_client_config_args(
884    args: &LinearClientConfigArgs,
885) -> Result<ResolvedLinearClientConfig, String> {
886    let auth = if let Some(secret_id) = args
887        .access_token_secret
888        .as_deref()
889        .or(args.secrets.access_token.as_deref())
890        .and_then(|value| parse_secret_id(Some(value)))
891    {
892        LinearAuthSource::AccessTokenSecret(secret_id)
893    } else if let Some(secret_id) = args
894        .api_key_secret
895        .as_deref()
896        .or(args.secrets.api_key.as_deref())
897        .and_then(|value| parse_secret_id(Some(value)))
898    {
899        LinearAuthSource::ApiKeySecret(secret_id)
900    } else if let Some(token) = args.access_token.clone() {
901        LinearAuthSource::AccessTokenInline(token)
902    } else if let Some(api_key) = args.api_key.clone() {
903        LinearAuthSource::ApiKeyInline(api_key)
904    } else {
905        return Err(
906            "linear connector requires access_token, access_token_secret, api_key, or api_key_secret"
907                .to_string(),
908        );
909    };
910
911    Ok(ResolvedLinearClientConfig {
912        api_base_url: args
913            .api_base_url
914            .clone()
915            .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
916        auth,
917    })
918}
919
920fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
921    let mut effective = headers.clone();
922    for (raw, canonical) in [
923        ("content-type", "Content-Type"),
924        ("linear-signature", "Linear-Signature"),
925        ("linear-delivery", "Linear-Delivery"),
926        ("linear-event", "Linear-Event"),
927    ] {
928        if let Some(value) = header_value(headers, raw) {
929            effective
930                .entry(canonical.to_string())
931                .or_insert_with(|| value.to_string());
932        }
933    }
934    effective
935}
936
937fn linear_trigger_kind(payload: &JsonValue) -> String {
938    let event = payload
939        .get("type")
940        .and_then(JsonValue::as_str)
941        .map(|kind| {
942            let lower = kind.to_ascii_lowercase();
943            match lower.as_str() {
944                "issue" => "issue".to_string(),
945                "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
946                "issuelabel" | "issue_label" => "issue_label".to_string(),
947                "project" | "projectupdate" | "project_update" => "project".to_string(),
948                "cycle" => "cycle".to_string(),
949                "customer" => "customer".to_string(),
950                "customerrequest" | "customer_request" => "customer_request".to_string(),
951                _ => lower,
952            }
953        })
954        .unwrap_or_else(|| "other".to_string());
955    let action = payload
956        .get("action")
957        .and_then(JsonValue::as_str)
958        .unwrap_or("update");
959    format!("{event}.{action}")
960}
961
962fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
963    let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Linear(payload)) =
964        payload
965    else {
966        return None;
967    };
968    let timestamp = match payload {
969        crate::triggers::event::LinearEventPayload::Issue(payload) => {
970            payload.common.webhook_timestamp
971        }
972        crate::triggers::event::LinearEventPayload::IssueComment(payload) => {
973            payload.common.webhook_timestamp
974        }
975        crate::triggers::event::LinearEventPayload::IssueLabel(payload) => {
976            payload.common.webhook_timestamp
977        }
978        crate::triggers::event::LinearEventPayload::Project(payload) => {
979            payload.common.webhook_timestamp
980        }
981        crate::triggers::event::LinearEventPayload::Cycle(payload) => {
982            payload.common.webhook_timestamp
983        }
984        crate::triggers::event::LinearEventPayload::Customer(payload) => {
985            payload.common.webhook_timestamp
986        }
987        crate::triggers::event::LinearEventPayload::CustomerRequest(payload) => {
988            payload.common.webhook_timestamp
989        }
990        crate::triggers::event::LinearEventPayload::Other(payload) => payload.webhook_timestamp,
991    }?;
992    OffsetDateTime::from_unix_timestamp_nanos(i128::from(timestamp) * 1_000_000).ok()
993}
994
995fn parse_args<T: for<'de> Deserialize<'de>>(args: JsonValue) -> Result<T, ClientError> {
996    serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
997}
998
999fn load_secret_text_blocking(
1000    ctx: &ConnectorCtx,
1001    secret_id: &SecretId,
1002) -> Result<String, ConnectorError> {
1003    let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1004    secret.with_exposed(|bytes| {
1005        std::str::from_utf8(bytes)
1006            .map(|value| value.to_string())
1007            .map_err(|error| {
1008                ConnectorError::Secret(format!(
1009                    "secret '{}' is not valid UTF-8: {error}",
1010                    secret_id
1011                ))
1012            })
1013    })
1014}
1015
1016fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1017    let trimmed = raw?.trim();
1018    if trimmed.is_empty() {
1019        return None;
1020    }
1021    let (base, version) = match trimmed.rsplit_once('@') {
1022        Some((base, version_text)) => {
1023            let version = version_text.parse::<u64>().ok()?;
1024            (base, SecretVersion::Exact(version))
1025        }
1026        None => (trimmed, SecretVersion::Latest),
1027    };
1028    let (namespace, name) = base.split_once('/')?;
1029    if namespace.is_empty() || name.is_empty() {
1030        return None;
1031    }
1032    Some(SecretId::new(namespace, name).with_version(version))
1033}
1034
1035fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1036    headers
1037        .iter()
1038        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1039        .map(|(_, value)| value.as_str())
1040}
1041
1042fn fallback_body_digest(body: &[u8]) -> String {
1043    use sha2::Digest;
1044
1045    let digest = sha2::Sha256::digest(body);
1046    let mut encoded = String::with_capacity(digest.len() * 2);
1047    for byte in digest {
1048        encoded.push_str(&format!("{byte:02x}"));
1049    }
1050    format!("sha256:{encoded}")
1051}
1052
1053fn graphql_meta(headers: &reqwest::header::HeaderMap, query: &str) -> JsonValue {
1054    let estimated = estimate_query_complexity(query);
1055    let observed = header_i64(headers, "x-complexity");
1056    let complexity = observed.or(estimated);
1057    let complexity_warning = complexity.is_some_and(|value| value >= COMPLEXITY_WARNING_THRESHOLD);
1058    json!({
1059        "complexity_estimate": estimated,
1060        "observed_complexity": observed,
1061        "complexity_warning": complexity_warning,
1062        "rate_limit": {
1063            "requests_limit": header_i64(headers, "x-ratelimit-requests-limit"),
1064            "requests_remaining": header_i64(headers, "x-ratelimit-requests-remaining"),
1065            "requests_reset": header_i64(headers, "x-ratelimit-requests-reset"),
1066            "complexity_limit": header_i64(headers, "x-ratelimit-complexity-limit"),
1067            "complexity_remaining": header_i64(headers, "x-ratelimit-complexity-remaining"),
1068            "complexity_reset": header_i64(headers, "x-ratelimit-complexity-reset"),
1069        },
1070    })
1071}
1072
1073fn header_i64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<i64> {
1074    headers
1075        .get(name)
1076        .and_then(|value| value.to_str().ok())
1077        .and_then(|value| value.parse::<i64>().ok())
1078}
1079
1080fn estimate_query_complexity(query: &str) -> Option<i64> {
1081    let normalized = query.trim();
1082    if normalized.is_empty() {
1083        return None;
1084    }
1085    let mut score = 1i64;
1086    let field_count =
1087        normalized.matches('{').count() as i64 + normalized.matches('}').count() as i64;
1088    score += field_count.max(1);
1089    for window in ["first:", "last:"] {
1090        let mut rest = normalized;
1091        while let Some(index) = rest.find(window) {
1092            let after = &rest[index + window.len()..];
1093            let digits: String = after
1094                .chars()
1095                .skip_while(|ch| ch.is_whitespace())
1096                .take_while(|ch| ch.is_ascii_digit())
1097                .collect();
1098            if let Ok(value) = digits.parse::<i64>() {
1099                score += value;
1100            } else {
1101                score += 50;
1102            }
1103            rest = after;
1104        }
1105    }
1106    score += (normalized.matches("nodes").count() as i64) * 50;
1107    Some(score)
1108}
1109
1110fn extract_graphql_field(mut envelope: JsonValue, field: &str) -> Result<JsonValue, ClientError> {
1111    let meta = envelope.get("meta").cloned().unwrap_or(JsonValue::Null);
1112    let Some(data) = envelope.get_mut("data").and_then(JsonValue::as_object_mut) else {
1113        return Err(ClientError::Transport(
1114            "linear connector response missing GraphQL data".to_string(),
1115        ));
1116    };
1117    let mut extracted = data.remove(field).ok_or_else(|| {
1118        ClientError::Transport(format!(
1119            "linear connector response missing `{field}` GraphQL field"
1120        ))
1121    })?;
1122    if let Some(object) = extracted.as_object_mut() {
1123        object.insert("meta".to_string(), meta);
1124    }
1125    Ok(extracted)
1126}
1127
1128fn is_rate_limited(status: StatusCode, errors: Option<&JsonValue>) -> bool {
1129    if status == StatusCode::TOO_MANY_REQUESTS {
1130        return true;
1131    }
1132    if status == StatusCode::BAD_REQUEST {
1133        return graphql_error_codes(errors)
1134            .into_iter()
1135            .any(|code| code.eq_ignore_ascii_case("RATELIMITED"));
1136    }
1137    false
1138}
1139
1140fn graphql_error_codes(errors: Option<&JsonValue>) -> Vec<String> {
1141    errors
1142        .and_then(JsonValue::as_array)
1143        .map(|errors| {
1144            errors
1145                .iter()
1146                .filter_map(|error| {
1147                    error
1148                        .get("extensions")
1149                        .and_then(JsonValue::as_object)
1150                        .and_then(|extensions| extensions.get("code"))
1151                        .and_then(JsonValue::as_str)
1152                        .map(ToString::to_string)
1153                })
1154                .collect()
1155        })
1156        .unwrap_or_default()
1157}
1158
1159fn graphql_error_message(status: StatusCode, errors: Option<&JsonValue>) -> String {
1160    let messages = errors
1161        .and_then(JsonValue::as_array)
1162        .map(|errors| {
1163            errors
1164                .iter()
1165                .filter_map(|error| {
1166                    error
1167                        .get("message")
1168                        .and_then(JsonValue::as_str)
1169                        .map(ToString::to_string)
1170                })
1171                .collect::<Vec<_>>()
1172        })
1173        .unwrap_or_default();
1174    if messages.is_empty() {
1175        format!("linear GraphQL request failed with status {status}")
1176    } else {
1177        format!(
1178            "linear GraphQL request failed with status {status}: {}",
1179            messages.join("; ")
1180        )
1181    }
1182}