Skip to main content

harn_vm/connectors/linear/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use async_trait::async_trait;
6use reqwest::StatusCode;
7use serde::Deserialize;
8use serde_json::{json, Map as JsonMap, Value as JsonValue};
9use time::{Duration, OffsetDateTime};
10use tokio::sync::watch;
11use tokio::task::JoinHandle;
12
13use crate::connectors::{
14    ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15    HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::secrets::{SecretId, SecretVersion};
18use crate::triggers::{
19    redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
20    TriggerEvent, TriggerEventId,
21};
22
23#[cfg(test)]
24mod tests;
25
26pub const LINEAR_PROVIDER_ID: &str = "linear";
27const DEFAULT_API_BASE_URL: &str = "https://api.linear.app/graphql";
28const DEFAULT_REPLAY_WINDOW_SECS: i64 = 60;
29const DEFAULT_REPLAY_GRACE_SECS: i64 = 15;
30const DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS: u64 = 60;
31const DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD: u32 = 5;
32const COMPLEXITY_WARNING_THRESHOLD: i64 = 5_000;
33
34pub struct LinearConnector {
35    provider_id: ProviderId,
36    kinds: Vec<TriggerKind>,
37    state: Arc<LinearConnectorState>,
38    client: Arc<LinearClient>,
39}
40
41#[derive(Default)]
42struct LinearConnectorState {
43    ctx: RwLock<Option<ConnectorCtx>>,
44    bindings: RwLock<HashMap<String, ActivatedLinearBinding>>,
45    monitor_tasks: Mutex<Vec<JoinHandle<()>>>,
46    monitor_shutdown: Mutex<Option<watch::Sender<bool>>>,
47}
48
49#[derive(Clone, Debug)]
50struct ActivatedLinearBinding {
51    #[allow(dead_code)]
52    binding_id: String,
53    path: Option<String>,
54    signing_secret: SecretId,
55    replay_grace_secs: i64,
56    monitor: Option<LinearWebhookMonitor>,
57}
58
59struct LinearClient {
60    provider_id: ProviderId,
61    state: Arc<LinearConnectorState>,
62    http: reqwest::Client,
63}
64
65#[derive(Debug, Default, Deserialize)]
66struct LinearBindingConfig {
67    #[serde(default, rename = "match")]
68    match_config: LinearMatchConfig,
69    #[serde(default)]
70    secrets: LinearSecretsConfig,
71    #[serde(default)]
72    security: LinearSecurityConfig,
73    #[serde(default)]
74    replay_grace_secs: Option<i64>,
75    #[serde(default)]
76    monitor: Option<LinearWebhookMonitorConfig>,
77}
78
79#[derive(Debug, Default, Deserialize)]
80struct LinearMatchConfig {
81    path: Option<String>,
82}
83
84#[derive(Clone, Debug, Default, Deserialize)]
85struct LinearSecretsConfig {
86    signing_secret: Option<String>,
87    access_token: Option<String>,
88    api_key: Option<String>,
89}
90
91#[derive(Debug, Default, Deserialize)]
92struct LinearSecurityConfig {
93    replay_grace_secs: Option<i64>,
94    timestamp_grace_secs: Option<i64>,
95}
96
97#[derive(Clone, Debug, Default, Deserialize)]
98struct LinearClientConfigArgs {
99    api_base_url: Option<String>,
100    api_key: Option<String>,
101    api_key_secret: Option<String>,
102    access_token: Option<String>,
103    access_token_secret: Option<String>,
104    #[serde(default)]
105    secrets: LinearSecretsConfig,
106}
107
108#[derive(Clone, Debug, Default, Deserialize)]
109struct LinearWebhookMonitorConfig {
110    #[serde(default)]
111    enabled: Option<bool>,
112    webhook_id: Option<String>,
113    health_url: Option<String>,
114    #[serde(default)]
115    probe_interval_secs: Option<u64>,
116    #[serde(default)]
117    probe_interval_ms: Option<u64>,
118    #[serde(default)]
119    success_threshold: Option<u32>,
120    #[serde(flatten)]
121    client: LinearClientConfigArgs,
122}
123
124#[derive(Clone, Debug)]
125struct ResolvedLinearClientConfig {
126    api_base_url: String,
127    auth: LinearAuthSource,
128}
129
130#[derive(Clone, Debug)]
131enum LinearAuthSource {
132    ApiKeyInline(String),
133    ApiKeySecret(SecretId),
134    AccessTokenInline(String),
135    AccessTokenSecret(SecretId),
136}
137
138#[derive(Clone, Debug)]
139struct LinearWebhookMonitor {
140    webhook_id: String,
141    health_url: String,
142    probe_interval: StdDuration,
143    success_threshold: u32,
144    client_config: ResolvedLinearClientConfig,
145}
146
147#[derive(Debug, Deserialize)]
148struct ListIssuesArgs {
149    #[serde(flatten)]
150    config: LinearClientConfigArgs,
151    #[serde(default)]
152    filter: Option<JsonValue>,
153    #[serde(default)]
154    first: Option<i64>,
155    #[serde(default)]
156    after: Option<String>,
157    #[serde(default)]
158    include_archived: Option<bool>,
159}
160
161#[derive(Debug, Deserialize)]
162struct UpdateIssueArgs {
163    #[serde(flatten)]
164    config: LinearClientConfigArgs,
165    id: String,
166    changes: JsonValue,
167}
168
169#[derive(Debug, Deserialize)]
170struct CreateCommentArgs {
171    #[serde(flatten)]
172    config: LinearClientConfigArgs,
173    issue_id: String,
174    body: String,
175}
176
177#[derive(Debug, Deserialize)]
178struct SearchArgs {
179    #[serde(flatten)]
180    config: LinearClientConfigArgs,
181    query: String,
182    #[serde(default)]
183    first: Option<i64>,
184}
185
186#[derive(Debug, Deserialize)]
187struct GraphqlArgs {
188    #[serde(flatten)]
189    config: LinearClientConfigArgs,
190    query: String,
191    #[serde(default)]
192    variables: Option<JsonValue>,
193    #[serde(default)]
194    operation_name: Option<String>,
195}
196
197impl LinearConnector {
198    pub fn new() -> Self {
199        let state = Arc::new(LinearConnectorState::default());
200        let client = Arc::new(LinearClient {
201            provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
202            state: state.clone(),
203            http: crate::connectors::outbound_http_client("harn-linear-connector"),
204        });
205        Self {
206            provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
207            kinds: vec![TriggerKind::from("webhook")],
208            state,
209            client,
210        }
211    }
212
213    fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedLinearBinding, ConnectorError> {
214        let bindings = self
215            .state
216            .bindings
217            .read()
218            .expect("linear connector bindings poisoned");
219        if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
220            return bindings.get(binding_id).cloned().ok_or_else(|| {
221                ConnectorError::Unsupported(format!(
222                    "linear connector has no active binding `{binding_id}`"
223                ))
224            });
225        }
226        if bindings.len() == 1 {
227            return bindings
228                .values()
229                .next()
230                .cloned()
231                .ok_or_else(|| ConnectorError::Activation("linear bindings missing".to_string()));
232        }
233        Err(ConnectorError::Unsupported(
234            "linear connector requires raw.metadata.binding_id when multiple bindings are active"
235                .to_string(),
236        ))
237    }
238
239    fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
240        self.state
241            .ctx
242            .read()
243            .expect("linear connector ctx poisoned")
244            .clone()
245            .ok_or_else(|| {
246                ConnectorError::Activation(
247                    "linear connector must be initialized before use".to_string(),
248                )
249            })
250    }
251
252    fn stop_monitors(&self) {
253        if let Some(shutdown) = self
254            .state
255            .monitor_shutdown
256            .lock()
257            .expect("linear connector monitor shutdown poisoned")
258            .take()
259        {
260            let _ = shutdown.send(true);
261        }
262        let mut tasks = self
263            .state
264            .monitor_tasks
265            .lock()
266            .expect("linear connector monitor tasks poisoned");
267        for task in tasks.drain(..) {
268            task.abort();
269        }
270    }
271}
272
273impl Default for LinearConnector {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279impl Drop for LinearConnector {
280    fn drop(&mut self) {
281        self.stop_monitors();
282    }
283}
284
285#[async_trait]
286impl Connector for LinearConnector {
287    fn provider_id(&self) -> &ProviderId {
288        &self.provider_id
289    }
290
291    fn kinds(&self) -> &[TriggerKind] {
292        &self.kinds
293    }
294
295    async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
296        *self
297            .state
298            .ctx
299            .write()
300            .expect("linear connector ctx poisoned") = Some(ctx);
301        Ok(())
302    }
303
304    async fn activate(
305        &self,
306        bindings: &[TriggerBinding],
307    ) -> Result<ActivationHandle, ConnectorError> {
308        self.stop_monitors();
309        let mut configured = HashMap::new();
310        let mut paths = BTreeSet::new();
311        for binding in bindings {
312            let activated = ActivatedLinearBinding::from_binding(binding)?;
313            if let Some(path) = &activated.path {
314                if !paths.insert(path.clone()) {
315                    return Err(ConnectorError::Activation(format!(
316                        "linear connector path `{path}` is configured by multiple bindings"
317                    )));
318                }
319            }
320            configured.insert(binding.binding_id.clone(), activated);
321        }
322        let (monitor_shutdown, _) = watch::channel(false);
323        *self
324            .state
325            .bindings
326            .write()
327            .expect("linear connector bindings poisoned") = configured;
328        *self
329            .state
330            .monitor_shutdown
331            .lock()
332            .expect("linear connector monitor shutdown poisoned") = Some(monitor_shutdown.clone());
333        let tasks = {
334            let bindings = self
335                .state
336                .bindings
337                .read()
338                .expect("linear connector bindings poisoned");
339            bindings
340                .values()
341                .filter_map(|binding| {
342                    binding.monitor.clone().map(|monitor| {
343                        let client = self.client.clone();
344                        let shutdown = monitor_shutdown.subscribe();
345                        tokio::spawn(async move {
346                            run_webhook_monitor(client, monitor, shutdown).await;
347                        })
348                    })
349                })
350                .collect::<Vec<_>>()
351        };
352        *self
353            .state
354            .monitor_tasks
355            .lock()
356            .expect("linear connector monitor tasks poisoned") = tasks;
357        Ok(ActivationHandle::new(
358            self.provider_id.clone(),
359            bindings.len(),
360        ))
361    }
362
363    async fn shutdown(&self, deadline: StdDuration) -> Result<(), ConnectorError> {
364        if let Some(shutdown) = self
365            .state
366            .monitor_shutdown
367            .lock()
368            .expect("linear connector monitor shutdown poisoned")
369            .take()
370        {
371            let _ = shutdown.send(true);
372        }
373        let pending = self
374            .state
375            .monitor_tasks
376            .lock()
377            .expect("linear connector monitor tasks poisoned")
378            .drain(..)
379            .collect::<Vec<_>>();
380        if pending.is_empty() {
381            return Ok(());
382        }
383        let wait_all = async {
384            for task in pending {
385                let _ = task.await;
386            }
387        };
388        tokio::time::timeout(deadline, wait_all)
389            .await
390            .map_err(|_| {
391                ConnectorError::Activation(format!(
392                    "linear connector shutdown exceeded {}s",
393                    deadline.as_secs()
394                ))
395            })?;
396        Ok(())
397    }
398
399    async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
400        let ctx = self.ctx()?;
401        let binding = self.binding_for_raw(&raw)?;
402        let provider = self.provider_id.clone();
403        let received_at = raw.received_at;
404        let headers = effective_headers(&raw.headers);
405        let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
406        let timestamp_window =
407            Duration::seconds(DEFAULT_REPLAY_WINDOW_SECS + binding.replay_grace_secs.max(0));
408        let verify = futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
409            ctx.event_log.as_ref(),
410            &provider,
411            HmacSignatureStyle::linear(),
412            &raw.body,
413            &headers,
414            secret.as_str(),
415            Some(timestamp_window),
416            received_at,
417        ));
418        if let Err(error) = verify {
419            if matches!(error, ConnectorError::TimestampOutOfWindow { .. }) {
420                ctx.metrics.record_linear_timestamp_rejection();
421            }
422            return Err(error);
423        }
424
425        let payload = raw.json_body()?;
426        let dedupe_key = header_value(&headers, "linear-delivery")
427            .map(ToString::to_string)
428            .or_else(|| {
429                payload
430                    .get("webhookId")
431                    .and_then(JsonValue::as_str)
432                    .map(ToString::to_string)
433            })
434            .unwrap_or_else(|| fallback_body_digest(&raw.body));
435        let kind = linear_trigger_kind(&payload);
436        let provider_payload = ProviderPayload::normalize(&provider, &kind, &headers, payload)
437            .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
438        Ok(TriggerEvent {
439            id: TriggerEventId::new(),
440            provider,
441            kind,
442            received_at,
443            occurred_at: raw
444                .occurred_at
445                .or_else(|| infer_occurred_at(&provider_payload)),
446            dedupe_key,
447            trace_id: TraceId::new(),
448            tenant_id: raw.tenant_id.clone(),
449            headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
450            batch: None,
451            raw_body: Some(raw.body.clone()),
452            provider_payload,
453            signature_status: SignatureStatus::Verified,
454            dedupe_claimed: false,
455        })
456    }
457
458    fn payload_schema(&self) -> ProviderPayloadSchema {
459        ProviderPayloadSchema::named("LinearEventPayload")
460    }
461
462    fn client(&self) -> Arc<dyn ConnectorClient> {
463        self.client.clone()
464    }
465}
466
467#[async_trait]
468impl ConnectorClient for LinearClient {
469    async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
470        match method {
471            "list_issues" => {
472                let args: ListIssuesArgs = parse_args(args)?;
473                let config = self.resolve_client_config(&args.config)?;
474                let envelope = self
475                    .request_graphql(
476                        &config,
477                        "query ListIssues($filter: IssueFilter, $first: Int, $after: String, $includeArchived: Boolean) { issues(filter: $filter, first: $first, after: $after, includeArchived: $includeArchived) { nodes { id identifier title priority estimate dueDate url createdAt updatedAt state { id name type } team { id key name } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } pageInfo { hasNextPage endCursor } } }",
478                        json!({
479                            "filter": args.filter,
480                            "first": args.first,
481                            "after": args.after,
482                            "includeArchived": args.include_archived,
483                        }),
484                        Some("ListIssues"),
485                    )
486                    .await?;
487                extract_graphql_field(envelope, "issues")
488            }
489            "update_issue" => {
490                let args: UpdateIssueArgs = parse_args(args)?;
491                let config = self.resolve_client_config(&args.config)?;
492                let envelope = self
493                    .request_graphql(
494                        &config,
495                        "mutation UpdateIssue($id: String!, $input: IssueUpdateInput!) { issueUpdate(id: $id, input: $input) { success issue { id identifier title priority estimate dueDate url updatedAt state { id name type } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } } }",
496                        json!({
497                            "id": args.id,
498                            "input": args.changes,
499                        }),
500                        Some("UpdateIssue"),
501                    )
502                    .await?;
503                extract_graphql_field(envelope, "issueUpdate")
504            }
505            "create_comment" => {
506                let args: CreateCommentArgs = parse_args(args)?;
507                let config = self.resolve_client_config(&args.config)?;
508                let envelope = self
509                    .request_graphql(
510                        &config,
511                        "mutation CreateComment($input: CommentCreateInput!) { commentCreate(input: $input) { success comment { id body url createdAt user { id name } issue { id identifier title } } } }",
512                        json!({
513                            "input": {
514                                "issueId": args.issue_id,
515                                "body": args.body,
516                            }
517                        }),
518                        Some("CreateComment"),
519                    )
520                    .await?;
521                extract_graphql_field(envelope, "commentCreate")
522            }
523            "search" => {
524                let args: SearchArgs = parse_args(args)?;
525                let config = self.resolve_client_config(&args.config)?;
526                self.search_issues(&config, &args.query, args.first).await
527            }
528            "graphql" => {
529                let args: GraphqlArgs = parse_args(args)?;
530                let config = self.resolve_client_config(&args.config)?;
531                self.request_graphql(
532                    &config,
533                    &args.query,
534                    args.variables.unwrap_or(JsonValue::Null),
535                    args.operation_name.as_deref(),
536                )
537                .await
538            }
539            other => Err(ClientError::MethodNotFound(format!(
540                "linear connector does not implement outbound method `{other}`"
541            ))),
542        }
543    }
544}
545
546impl LinearClient {
547    fn resolve_client_config(
548        &self,
549        args: &LinearClientConfigArgs,
550    ) -> Result<ResolvedLinearClientConfig, ClientError> {
551        resolve_client_config_args(args).map_err(ClientError::InvalidArgs)
552    }
553
554    fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
555        self.state
556            .ctx
557            .read()
558            .expect("linear connector ctx poisoned")
559            .clone()
560            .ok_or_else(|| ClientError::Other("linear connector must be initialized".to_string()))
561    }
562
563    async fn request_graphql(
564        &self,
565        config: &ResolvedLinearClientConfig,
566        query: &str,
567        variables: JsonValue,
568        operation_name: Option<&str>,
569    ) -> Result<JsonValue, ClientError> {
570        let ctx = self.ctx()?;
571        ctx.rate_limiter
572            .scoped(&self.provider_id, "graphql")
573            .acquire()
574            .await;
575
576        let auth = self.auth_header(config).await?;
577        let mut request_body = JsonMap::new();
578        request_body.insert("query".to_string(), JsonValue::String(query.to_string()));
579        if !variables.is_null() {
580            request_body.insert("variables".to_string(), variables);
581        }
582        if let Some(operation_name) = operation_name {
583            request_body.insert(
584                "operationName".to_string(),
585                JsonValue::String(operation_name.to_string()),
586            );
587        }
588
589        let response = self
590            .http
591            .post(config.api_base_url.clone())
592            .header("Content-Type", "application/json")
593            .header("Authorization", auth)
594            .json(&JsonValue::Object(request_body))
595            .send()
596            .await
597            .map_err(|error| ClientError::Transport(error.to_string()))?;
598
599        let status = response.status();
600        let meta = graphql_meta(response.headers(), query);
601        let payload = response
602            .json::<JsonValue>()
603            .await
604            .map_err(|error| ClientError::Transport(error.to_string()))?;
605        let errors = payload.get("errors").cloned();
606        if is_rate_limited(status, errors.as_ref()) {
607            return Err(ClientError::RateLimited(graphql_error_message(
608                status,
609                errors.as_ref(),
610            )));
611        }
612        if !status.is_success() {
613            return Err(ClientError::Transport(graphql_error_message(
614                status,
615                errors.as_ref(),
616            )));
617        }
618
619        Ok(json!({
620            "data": payload.get("data").cloned().unwrap_or(JsonValue::Null),
621            "errors": errors.unwrap_or(JsonValue::Null),
622            "meta": meta,
623        }))
624    }
625
626    async fn auth_header(
627        &self,
628        config: &ResolvedLinearClientConfig,
629    ) -> Result<String, ClientError> {
630        match &config.auth {
631            LinearAuthSource::ApiKeyInline(value) => Ok(value.clone()),
632            LinearAuthSource::AccessTokenInline(value) => Ok(format!("Bearer {value}")),
633            LinearAuthSource::ApiKeySecret(secret_id) => {
634                let secret = self.secret_text(secret_id).await?;
635                Ok(secret)
636            }
637            LinearAuthSource::AccessTokenSecret(secret_id) => {
638                let secret = self.secret_text(secret_id).await?;
639                Ok(format!("Bearer {secret}"))
640            }
641        }
642    }
643
644    async fn secret_text(&self, secret_id: &SecretId) -> Result<String, ClientError> {
645        let ctx = self.ctx()?;
646        let secret = ctx
647            .secrets
648            .get(secret_id)
649            .await
650            .map_err(|error| ClientError::Other(error.to_string()))?;
651        Ok(secret.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string()))
652    }
653
654    async fn search_issues(
655        &self,
656        config: &ResolvedLinearClientConfig,
657        query: &str,
658        first: Option<i64>,
659    ) -> Result<JsonValue, ClientError> {
660        let searches = [
661            (
662                "query SearchIssues($query: String!, $first: Int) { searchIssues(query: $query, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
663                "SearchIssues",
664            ),
665            (
666                "query SearchIssues($term: String!, $first: Int) { searchIssues(term: $term, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
667                "SearchIssues",
668            ),
669        ];
670
671        for (index, (document, operation_name)) in searches.into_iter().enumerate() {
672            let variables = if document.contains("$query") {
673                json!({ "query": query, "first": first })
674            } else {
675                json!({ "term": query, "first": first })
676            };
677            match self
678                .request_graphql(config, document, variables, Some(operation_name))
679                .await
680            {
681                Ok(envelope) => return extract_graphql_field(envelope, "searchIssues"),
682                Err(ClientError::Transport(message))
683                    if index == 0
684                        && (message.contains("Unknown argument")
685                            || message.contains("GRAPHQL_VALIDATION_FAILED")) => {}
686                Err(error) => return Err(error),
687            }
688        }
689
690        Err(ClientError::Transport(
691            "linear connector searchIssues query failed".to_string(),
692        ))
693    }
694
695    async fn probe_health(&self, health_url: &str) -> Result<bool, ClientError> {
696        let response = self
697            .http
698            .get(health_url)
699            .send()
700            .await
701            .map_err(|error| ClientError::Transport(error.to_string()))?;
702        Ok(response.status() == StatusCode::OK)
703    }
704
705    async fn reenable_webhook(
706        &self,
707        config: &ResolvedLinearClientConfig,
708        webhook_id: &str,
709    ) -> Result<(), ClientError> {
710        let envelope = self
711            .request_graphql(
712                config,
713                "mutation ReenableWebhook($id: String!, $input: WebhookUpdateInput!) { webhookUpdate(id: $id, input: $input) { success webhook { id enabled } } }",
714                json!({
715                    "id": webhook_id,
716                    "input": { "enabled": true },
717                }),
718                Some("ReenableWebhook"),
719            )
720            .await?;
721        let payload = envelope
722            .get("data")
723            .and_then(|value| value.get("webhookUpdate"))
724            .ok_or_else(|| {
725                ClientError::Transport(
726                    "linear connector response missing `webhookUpdate` GraphQL field".to_string(),
727                )
728            })?;
729        if payload
730            .get("success")
731            .and_then(JsonValue::as_bool)
732            .unwrap_or(false)
733        {
734            Ok(())
735        } else {
736            Err(ClientError::Transport(
737                "linear connector webhook re-enable returned success = false".to_string(),
738            ))
739        }
740    }
741}
742
743impl ActivatedLinearBinding {
744    fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
745        let config: LinearBindingConfig =
746            serde_json::from_value(binding.config.clone()).map_err(|error| {
747                ConnectorError::Activation(format!(
748                    "linear binding `{}` has invalid config: {error}",
749                    binding.binding_id
750                ))
751            })?;
752        let signing_secret =
753            parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
754                ConnectorError::Activation(format!(
755                    "linear binding `{}` requires secrets.signing_secret",
756                    binding.binding_id
757                ))
758            })?;
759        let replay_grace_secs = config
760            .replay_grace_secs
761            .or(config.security.replay_grace_secs)
762            .or(config.security.timestamp_grace_secs)
763            .unwrap_or(DEFAULT_REPLAY_GRACE_SECS);
764        let monitor = config
765            .monitor
766            .map(|monitor| {
767                LinearWebhookMonitor::from_config(&binding.binding_id, monitor, &config.secrets)
768            })
769            .transpose()
770            .map_err(ConnectorError::Activation)?
771            .flatten();
772        Ok(Self {
773            binding_id: binding.binding_id.clone(),
774            path: config.match_config.path,
775            signing_secret,
776            replay_grace_secs,
777            monitor,
778        })
779    }
780}
781
782impl LinearWebhookMonitor {
783    fn from_config(
784        binding_id: &str,
785        config: LinearWebhookMonitorConfig,
786        binding_secrets: &LinearSecretsConfig,
787    ) -> Result<Option<Self>, String> {
788        if config.enabled == Some(false) {
789            return Ok(None);
790        }
791        let webhook_id = config
792            .webhook_id
793            .filter(|value| !value.trim().is_empty())
794            .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires webhook_id"))?;
795        let health_url = config
796            .health_url
797            .filter(|value| !value.trim().is_empty())
798            .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires health_url"))?;
799        let mut client = config.client;
800        if client.access_token.is_none()
801            && client.access_token_secret.is_none()
802            && client.api_key.is_none()
803            && client.api_key_secret.is_none()
804            && client.secrets.access_token.is_none()
805            && client.secrets.api_key.is_none()
806        {
807            client.secrets.access_token = binding_secrets.access_token.clone();
808            client.secrets.api_key = binding_secrets.api_key.clone();
809        }
810        let client_config = resolve_client_config_args(&client).map_err(|error| {
811            format!("linear binding `{binding_id}` monitor auth is invalid: {error}")
812        })?;
813        Ok(Some(Self {
814            webhook_id,
815            health_url,
816            probe_interval: config
817                .probe_interval_ms
818                .map(|ms| StdDuration::from_millis(ms.max(1)))
819                .unwrap_or_else(|| {
820                    StdDuration::from_secs(
821                        config
822                            .probe_interval_secs
823                            .unwrap_or(DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS)
824                            .max(1),
825                    )
826                }),
827            success_threshold: config
828                .success_threshold
829                .unwrap_or(DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD)
830                .max(1),
831            client_config,
832        }))
833    }
834}
835
836async fn run_webhook_monitor(
837    client: Arc<LinearClient>,
838    monitor: LinearWebhookMonitor,
839    mut shutdown: watch::Receiver<bool>,
840) {
841    let mut consecutive_successes = 0u32;
842    loop {
843        tokio::select! {
844            changed = shutdown.changed() => {
845                if changed.is_err() || *shutdown.borrow() {
846                    break;
847                }
848            }
849            _ = tokio::time::sleep(monitor.probe_interval) => {}
850        }
851        if *shutdown.borrow() {
852            break;
853        }
854        match client.probe_health(&monitor.health_url).await {
855            Ok(true) => {
856                consecutive_successes = consecutive_successes.saturating_add(1);
857                if consecutive_successes >= monitor.success_threshold
858                    && client
859                        .reenable_webhook(&monitor.client_config, &monitor.webhook_id)
860                        .await
861                        .is_ok()
862                {
863                    consecutive_successes = 0;
864                }
865            }
866            Ok(false) | Err(_) => {
867                consecutive_successes = 0;
868            }
869        }
870    }
871}
872
873fn resolve_client_config_args(
874    args: &LinearClientConfigArgs,
875) -> Result<ResolvedLinearClientConfig, String> {
876    let auth = if let Some(secret_id) = args
877        .access_token_secret
878        .as_deref()
879        .or(args.secrets.access_token.as_deref())
880        .and_then(|value| parse_secret_id(Some(value)))
881    {
882        LinearAuthSource::AccessTokenSecret(secret_id)
883    } else if let Some(secret_id) = args
884        .api_key_secret
885        .as_deref()
886        .or(args.secrets.api_key.as_deref())
887        .and_then(|value| parse_secret_id(Some(value)))
888    {
889        LinearAuthSource::ApiKeySecret(secret_id)
890    } else if let Some(token) = args.access_token.clone() {
891        LinearAuthSource::AccessTokenInline(token)
892    } else if let Some(api_key) = args.api_key.clone() {
893        LinearAuthSource::ApiKeyInline(api_key)
894    } else {
895        return Err(
896            "linear connector requires access_token, access_token_secret, api_key, or api_key_secret"
897                .to_string(),
898        );
899    };
900
901    Ok(ResolvedLinearClientConfig {
902        api_base_url: args
903            .api_base_url
904            .clone()
905            .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
906        auth,
907    })
908}
909
910fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
911    let mut effective = headers.clone();
912    for (raw, canonical) in [
913        ("content-type", "Content-Type"),
914        ("linear-signature", "Linear-Signature"),
915        ("linear-delivery", "Linear-Delivery"),
916        ("linear-event", "Linear-Event"),
917    ] {
918        if let Some(value) = header_value(headers, raw) {
919            effective
920                .entry(canonical.to_string())
921                .or_insert_with(|| value.to_string());
922        }
923    }
924    effective
925}
926
927fn linear_trigger_kind(payload: &JsonValue) -> String {
928    let event = payload
929        .get("type")
930        .and_then(JsonValue::as_str)
931        .map(|kind| {
932            let lower = kind.to_ascii_lowercase();
933            match lower.as_str() {
934                "issue" => "issue".to_string(),
935                "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
936                "issuelabel" | "issue_label" => "issue_label".to_string(),
937                "project" | "projectupdate" | "project_update" => "project".to_string(),
938                "cycle" => "cycle".to_string(),
939                "customer" => "customer".to_string(),
940                "customerrequest" | "customer_request" => "customer_request".to_string(),
941                _ => lower,
942            }
943        })
944        .unwrap_or_else(|| "other".to_string());
945    let action = payload
946        .get("action")
947        .and_then(JsonValue::as_str)
948        .unwrap_or("update");
949    format!("{event}.{action}")
950}
951
952fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
953    let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Linear(payload)) =
954        payload
955    else {
956        return None;
957    };
958    let timestamp = match payload {
959        crate::triggers::event::LinearEventPayload::Issue(payload) => {
960            payload.common.webhook_timestamp
961        }
962        crate::triggers::event::LinearEventPayload::IssueComment(payload) => {
963            payload.common.webhook_timestamp
964        }
965        crate::triggers::event::LinearEventPayload::IssueLabel(payload) => {
966            payload.common.webhook_timestamp
967        }
968        crate::triggers::event::LinearEventPayload::Project(payload) => {
969            payload.common.webhook_timestamp
970        }
971        crate::triggers::event::LinearEventPayload::Cycle(payload) => {
972            payload.common.webhook_timestamp
973        }
974        crate::triggers::event::LinearEventPayload::Customer(payload) => {
975            payload.common.webhook_timestamp
976        }
977        crate::triggers::event::LinearEventPayload::CustomerRequest(payload) => {
978            payload.common.webhook_timestamp
979        }
980        crate::triggers::event::LinearEventPayload::Other(payload) => payload.webhook_timestamp,
981    }?;
982    OffsetDateTime::from_unix_timestamp_nanos(i128::from(timestamp) * 1_000_000).ok()
983}
984
985fn parse_args<T: for<'de> Deserialize<'de>>(args: JsonValue) -> Result<T, ClientError> {
986    serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
987}
988
989fn load_secret_text_blocking(
990    ctx: &ConnectorCtx,
991    secret_id: &SecretId,
992) -> Result<String, ConnectorError> {
993    let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
994    secret.with_exposed(|bytes| {
995        std::str::from_utf8(bytes)
996            .map(|value| value.to_string())
997            .map_err(|error| {
998                ConnectorError::Secret(format!(
999                    "secret '{}' is not valid UTF-8: {error}",
1000                    secret_id
1001                ))
1002            })
1003    })
1004}
1005
1006fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1007    let trimmed = raw?.trim();
1008    if trimmed.is_empty() {
1009        return None;
1010    }
1011    let (base, version) = match trimmed.rsplit_once('@') {
1012        Some((base, version_text)) => {
1013            let version = version_text.parse::<u64>().ok()?;
1014            (base, SecretVersion::Exact(version))
1015        }
1016        None => (trimmed, SecretVersion::Latest),
1017    };
1018    let (namespace, name) = base.split_once('/')?;
1019    if namespace.is_empty() || name.is_empty() {
1020        return None;
1021    }
1022    Some(SecretId::new(namespace, name).with_version(version))
1023}
1024
1025fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1026    headers
1027        .iter()
1028        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1029        .map(|(_, value)| value.as_str())
1030}
1031
1032fn fallback_body_digest(body: &[u8]) -> String {
1033    use sha2::Digest;
1034
1035    let digest = sha2::Sha256::digest(body);
1036    let mut encoded = String::with_capacity(digest.len() * 2);
1037    for byte in digest {
1038        encoded.push_str(&format!("{byte:02x}"));
1039    }
1040    format!("sha256:{encoded}")
1041}
1042
1043fn graphql_meta(headers: &reqwest::header::HeaderMap, query: &str) -> JsonValue {
1044    let estimated = estimate_query_complexity(query);
1045    let observed = header_i64(headers, "x-complexity");
1046    let complexity = observed.or(estimated);
1047    let complexity_warning = complexity.is_some_and(|value| value >= COMPLEXITY_WARNING_THRESHOLD);
1048    json!({
1049        "complexity_estimate": estimated,
1050        "observed_complexity": observed,
1051        "complexity_warning": complexity_warning,
1052        "rate_limit": {
1053            "requests_limit": header_i64(headers, "x-ratelimit-requests-limit"),
1054            "requests_remaining": header_i64(headers, "x-ratelimit-requests-remaining"),
1055            "requests_reset": header_i64(headers, "x-ratelimit-requests-reset"),
1056            "complexity_limit": header_i64(headers, "x-ratelimit-complexity-limit"),
1057            "complexity_remaining": header_i64(headers, "x-ratelimit-complexity-remaining"),
1058            "complexity_reset": header_i64(headers, "x-ratelimit-complexity-reset"),
1059        },
1060    })
1061}
1062
1063fn header_i64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<i64> {
1064    headers
1065        .get(name)
1066        .and_then(|value| value.to_str().ok())
1067        .and_then(|value| value.parse::<i64>().ok())
1068}
1069
1070fn estimate_query_complexity(query: &str) -> Option<i64> {
1071    let normalized = query.trim();
1072    if normalized.is_empty() {
1073        return None;
1074    }
1075    let mut score = 1i64;
1076    let field_count =
1077        normalized.matches('{').count() as i64 + normalized.matches('}').count() as i64;
1078    score += field_count.max(1);
1079    for window in ["first:", "last:"] {
1080        let mut rest = normalized;
1081        while let Some(index) = rest.find(window) {
1082            let after = &rest[index + window.len()..];
1083            let digits: String = after
1084                .chars()
1085                .skip_while(|ch| ch.is_whitespace())
1086                .take_while(|ch| ch.is_ascii_digit())
1087                .collect();
1088            if let Ok(value) = digits.parse::<i64>() {
1089                score += value;
1090            } else {
1091                score += 50;
1092            }
1093            rest = after;
1094        }
1095    }
1096    score += (normalized.matches("nodes").count() as i64) * 50;
1097    Some(score)
1098}
1099
1100fn extract_graphql_field(mut envelope: JsonValue, field: &str) -> Result<JsonValue, ClientError> {
1101    let meta = envelope.get("meta").cloned().unwrap_or(JsonValue::Null);
1102    let Some(data) = envelope.get_mut("data").and_then(JsonValue::as_object_mut) else {
1103        return Err(ClientError::Transport(
1104            "linear connector response missing GraphQL data".to_string(),
1105        ));
1106    };
1107    let mut extracted = data.remove(field).ok_or_else(|| {
1108        ClientError::Transport(format!(
1109            "linear connector response missing `{field}` GraphQL field"
1110        ))
1111    })?;
1112    if let Some(object) = extracted.as_object_mut() {
1113        object.insert("meta".to_string(), meta);
1114    }
1115    Ok(extracted)
1116}
1117
1118fn is_rate_limited(status: StatusCode, errors: Option<&JsonValue>) -> bool {
1119    if status == StatusCode::TOO_MANY_REQUESTS {
1120        return true;
1121    }
1122    if status == StatusCode::BAD_REQUEST {
1123        return graphql_error_codes(errors)
1124            .into_iter()
1125            .any(|code| code.eq_ignore_ascii_case("RATELIMITED"));
1126    }
1127    false
1128}
1129
1130fn graphql_error_codes(errors: Option<&JsonValue>) -> Vec<String> {
1131    errors
1132        .and_then(JsonValue::as_array)
1133        .map(|errors| {
1134            errors
1135                .iter()
1136                .filter_map(|error| {
1137                    error
1138                        .get("extensions")
1139                        .and_then(JsonValue::as_object)
1140                        .and_then(|extensions| extensions.get("code"))
1141                        .and_then(JsonValue::as_str)
1142                        .map(ToString::to_string)
1143                })
1144                .collect()
1145        })
1146        .unwrap_or_default()
1147}
1148
1149fn graphql_error_message(status: StatusCode, errors: Option<&JsonValue>) -> String {
1150    let messages = errors
1151        .and_then(JsonValue::as_array)
1152        .map(|errors| {
1153            errors
1154                .iter()
1155                .filter_map(|error| {
1156                    error
1157                        .get("message")
1158                        .and_then(JsonValue::as_str)
1159                        .map(ToString::to_string)
1160                })
1161                .collect::<Vec<_>>()
1162        })
1163        .unwrap_or_default();
1164    if messages.is_empty() {
1165        format!("linear GraphQL request failed with status {status}")
1166    } else {
1167        format!(
1168            "linear GraphQL request failed with status {status}: {}",
1169            messages.join("; ")
1170        )
1171    }
1172}