1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use async_trait::async_trait;
6use reqwest::StatusCode;
7use serde::Deserialize;
8use serde_json::{json, Map as JsonMap, Value as JsonValue};
9use time::{Duration, OffsetDateTime};
10use tokio::sync::watch;
11use tokio::task::JoinHandle;
12
13use crate::connectors::{
14 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15 HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::secrets::{SecretId, SecretVersion};
18use crate::triggers::{
19 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
20 TriggerEvent, TriggerEventId,
21};
22
23#[cfg(test)]
24mod tests;
25
26pub const LINEAR_PROVIDER_ID: &str = "linear";
27const DEFAULT_API_BASE_URL: &str = "https://api.linear.app/graphql";
28const DEFAULT_REPLAY_WINDOW_SECS: i64 = 60;
29const DEFAULT_REPLAY_GRACE_SECS: i64 = 15;
30const DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS: u64 = 60;
31const DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD: u32 = 5;
32const COMPLEXITY_WARNING_THRESHOLD: i64 = 5_000;
33
34pub struct LinearConnector {
35 provider_id: ProviderId,
36 kinds: Vec<TriggerKind>,
37 state: Arc<LinearConnectorState>,
38 client: Arc<LinearClient>,
39}
40
41#[derive(Default)]
42struct LinearConnectorState {
43 ctx: RwLock<Option<ConnectorCtx>>,
44 bindings: RwLock<HashMap<String, ActivatedLinearBinding>>,
45 monitor_tasks: Mutex<Vec<JoinHandle<()>>>,
46 monitor_shutdown: Mutex<Option<watch::Sender<bool>>>,
47}
48
49#[derive(Clone, Debug)]
50struct ActivatedLinearBinding {
51 #[allow(dead_code)]
52 binding_id: String,
53 path: Option<String>,
54 signing_secret: SecretId,
55 replay_grace_secs: i64,
56 monitor: Option<LinearWebhookMonitor>,
57}
58
59struct LinearClient {
60 provider_id: ProviderId,
61 state: Arc<LinearConnectorState>,
62 http: reqwest::Client,
63}
64
65#[derive(Debug, Default, Deserialize)]
66struct LinearBindingConfig {
67 #[serde(default, rename = "match")]
68 match_config: LinearMatchConfig,
69 #[serde(default)]
70 secrets: LinearSecretsConfig,
71 #[serde(default)]
72 security: LinearSecurityConfig,
73 #[serde(default)]
74 replay_grace_secs: Option<i64>,
75 #[serde(default)]
76 monitor: Option<LinearWebhookMonitorConfig>,
77}
78
79#[derive(Debug, Default, Deserialize)]
80struct LinearMatchConfig {
81 path: Option<String>,
82}
83
84#[derive(Clone, Debug, Default, Deserialize)]
85struct LinearSecretsConfig {
86 signing_secret: Option<String>,
87 access_token: Option<String>,
88 api_key: Option<String>,
89}
90
91#[derive(Debug, Default, Deserialize)]
92struct LinearSecurityConfig {
93 replay_grace_secs: Option<i64>,
94 timestamp_grace_secs: Option<i64>,
95}
96
97#[derive(Clone, Debug, Default, Deserialize)]
98struct LinearClientConfigArgs {
99 api_base_url: Option<String>,
100 api_key: Option<String>,
101 api_key_secret: Option<String>,
102 access_token: Option<String>,
103 access_token_secret: Option<String>,
104 #[serde(default)]
105 secrets: LinearSecretsConfig,
106}
107
108#[derive(Clone, Debug, Default, Deserialize)]
109struct LinearWebhookMonitorConfig {
110 #[serde(default)]
111 enabled: Option<bool>,
112 webhook_id: Option<String>,
113 health_url: Option<String>,
114 #[serde(default)]
115 probe_interval_secs: Option<u64>,
116 #[serde(default)]
117 probe_interval_ms: Option<u64>,
118 #[serde(default)]
119 success_threshold: Option<u32>,
120 #[serde(flatten)]
121 client: LinearClientConfigArgs,
122}
123
124#[derive(Clone, Debug)]
125struct ResolvedLinearClientConfig {
126 api_base_url: String,
127 auth: LinearAuthSource,
128}
129
130#[derive(Clone, Debug)]
131enum LinearAuthSource {
132 ApiKeyInline(String),
133 ApiKeySecret(SecretId),
134 AccessTokenInline(String),
135 AccessTokenSecret(SecretId),
136}
137
138#[derive(Clone, Debug)]
139struct LinearWebhookMonitor {
140 webhook_id: String,
141 health_url: String,
142 probe_interval: StdDuration,
143 success_threshold: u32,
144 client_config: ResolvedLinearClientConfig,
145}
146
147#[derive(Debug, Deserialize)]
148struct ListIssuesArgs {
149 #[serde(flatten)]
150 config: LinearClientConfigArgs,
151 #[serde(default)]
152 filter: Option<JsonValue>,
153 #[serde(default)]
154 first: Option<i64>,
155 #[serde(default)]
156 after: Option<String>,
157 #[serde(default)]
158 include_archived: Option<bool>,
159}
160
161#[derive(Debug, Deserialize)]
162struct UpdateIssueArgs {
163 #[serde(flatten)]
164 config: LinearClientConfigArgs,
165 id: String,
166 changes: JsonValue,
167}
168
169#[derive(Debug, Deserialize)]
170struct CreateCommentArgs {
171 #[serde(flatten)]
172 config: LinearClientConfigArgs,
173 issue_id: String,
174 body: String,
175}
176
177#[derive(Debug, Deserialize)]
178struct SearchArgs {
179 #[serde(flatten)]
180 config: LinearClientConfigArgs,
181 query: String,
182 #[serde(default)]
183 first: Option<i64>,
184}
185
186#[derive(Debug, Deserialize)]
187struct GraphqlArgs {
188 #[serde(flatten)]
189 config: LinearClientConfigArgs,
190 query: String,
191 #[serde(default)]
192 variables: Option<JsonValue>,
193 #[serde(default)]
194 operation_name: Option<String>,
195}
196
197impl LinearConnector {
198 pub fn new() -> Self {
199 let state = Arc::new(LinearConnectorState::default());
200 let client = Arc::new(LinearClient {
201 provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
202 state: state.clone(),
203 http: crate::connectors::outbound_http_client("harn-linear-connector"),
204 });
205 Self {
206 provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
207 kinds: vec![TriggerKind::from("webhook")],
208 state,
209 client,
210 }
211 }
212
213 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedLinearBinding, ConnectorError> {
214 let bindings = self
215 .state
216 .bindings
217 .read()
218 .expect("linear connector bindings poisoned");
219 if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
220 return bindings.get(binding_id).cloned().ok_or_else(|| {
221 ConnectorError::Unsupported(format!(
222 "linear connector has no active binding `{binding_id}`"
223 ))
224 });
225 }
226 if bindings.len() == 1 {
227 return bindings
228 .values()
229 .next()
230 .cloned()
231 .ok_or_else(|| ConnectorError::Activation("linear bindings missing".to_string()));
232 }
233 Err(ConnectorError::Unsupported(
234 "linear connector requires raw.metadata.binding_id when multiple bindings are active"
235 .to_string(),
236 ))
237 }
238
239 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
240 self.state
241 .ctx
242 .read()
243 .expect("linear connector ctx poisoned")
244 .clone()
245 .ok_or_else(|| {
246 ConnectorError::Activation(
247 "linear connector must be initialized before use".to_string(),
248 )
249 })
250 }
251
252 fn stop_monitors(&self) {
253 if let Some(shutdown) = self
254 .state
255 .monitor_shutdown
256 .lock()
257 .expect("linear connector monitor shutdown poisoned")
258 .take()
259 {
260 let _ = shutdown.send(true);
261 }
262 let mut tasks = self
263 .state
264 .monitor_tasks
265 .lock()
266 .expect("linear connector monitor tasks poisoned");
267 for task in tasks.drain(..) {
268 task.abort();
269 }
270 }
271}
272
273impl Default for LinearConnector {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279impl Drop for LinearConnector {
280 fn drop(&mut self) {
281 self.stop_monitors();
282 }
283}
284
285#[async_trait]
286impl Connector for LinearConnector {
287 fn provider_id(&self) -> &ProviderId {
288 &self.provider_id
289 }
290
291 fn kinds(&self) -> &[TriggerKind] {
292 &self.kinds
293 }
294
295 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
296 *self
297 .state
298 .ctx
299 .write()
300 .expect("linear connector ctx poisoned") = Some(ctx);
301 Ok(())
302 }
303
304 async fn activate(
305 &self,
306 bindings: &[TriggerBinding],
307 ) -> Result<ActivationHandle, ConnectorError> {
308 self.stop_monitors();
309 let mut configured = HashMap::new();
310 let mut paths = BTreeSet::new();
311 for binding in bindings {
312 let activated = ActivatedLinearBinding::from_binding(binding)?;
313 if let Some(path) = &activated.path {
314 if !paths.insert(path.clone()) {
315 return Err(ConnectorError::Activation(format!(
316 "linear connector path `{path}` is configured by multiple bindings"
317 )));
318 }
319 }
320 configured.insert(binding.binding_id.clone(), activated);
321 }
322 let (monitor_shutdown, _) = watch::channel(false);
323 *self
324 .state
325 .bindings
326 .write()
327 .expect("linear connector bindings poisoned") = configured;
328 *self
329 .state
330 .monitor_shutdown
331 .lock()
332 .expect("linear connector monitor shutdown poisoned") = Some(monitor_shutdown.clone());
333 let tasks = {
334 let bindings = self
335 .state
336 .bindings
337 .read()
338 .expect("linear connector bindings poisoned");
339 bindings
340 .values()
341 .filter_map(|binding| {
342 binding.monitor.clone().map(|monitor| {
343 let client = self.client.clone();
344 let shutdown = monitor_shutdown.subscribe();
345 tokio::spawn(async move {
346 run_webhook_monitor(client, monitor, shutdown).await;
347 })
348 })
349 })
350 .collect::<Vec<_>>()
351 };
352 *self
353 .state
354 .monitor_tasks
355 .lock()
356 .expect("linear connector monitor tasks poisoned") = tasks;
357 Ok(ActivationHandle::new(
358 self.provider_id.clone(),
359 bindings.len(),
360 ))
361 }
362
363 async fn shutdown(&self, deadline: StdDuration) -> Result<(), ConnectorError> {
364 if let Some(shutdown) = self
365 .state
366 .monitor_shutdown
367 .lock()
368 .expect("linear connector monitor shutdown poisoned")
369 .take()
370 {
371 let _ = shutdown.send(true);
372 }
373 let pending = self
374 .state
375 .monitor_tasks
376 .lock()
377 .expect("linear connector monitor tasks poisoned")
378 .drain(..)
379 .collect::<Vec<_>>();
380 if pending.is_empty() {
381 return Ok(());
382 }
383 let wait_all = async {
384 for task in pending {
385 let _ = task.await;
386 }
387 };
388 tokio::time::timeout(deadline, wait_all)
389 .await
390 .map_err(|_| {
391 ConnectorError::Activation(format!(
392 "linear connector shutdown exceeded {}s",
393 deadline.as_secs()
394 ))
395 })?;
396 Ok(())
397 }
398
399 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
400 let ctx = self.ctx()?;
401 let binding = self.binding_for_raw(&raw)?;
402 let provider = self.provider_id.clone();
403 let received_at = raw.received_at;
404 let headers = effective_headers(&raw.headers);
405 let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
406 let timestamp_window =
407 Duration::seconds(DEFAULT_REPLAY_WINDOW_SECS + binding.replay_grace_secs.max(0));
408 let verify = futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
409 ctx.event_log.as_ref(),
410 &provider,
411 HmacSignatureStyle::linear(),
412 &raw.body,
413 &headers,
414 secret.as_str(),
415 Some(timestamp_window),
416 received_at,
417 ));
418 if let Err(error) = verify {
419 if matches!(error, ConnectorError::TimestampOutOfWindow { .. }) {
420 ctx.metrics.record_linear_timestamp_rejection();
421 }
422 return Err(error);
423 }
424
425 let payload = raw.json_body()?;
426 let dedupe_key = header_value(&headers, "linear-delivery")
427 .map(ToString::to_string)
428 .or_else(|| {
429 payload
430 .get("webhookId")
431 .and_then(JsonValue::as_str)
432 .map(ToString::to_string)
433 })
434 .unwrap_or_else(|| fallback_body_digest(&raw.body));
435 let kind = linear_trigger_kind(&payload);
436 let provider_payload = ProviderPayload::normalize(&provider, &kind, &headers, payload)
437 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
438 Ok(TriggerEvent {
439 id: TriggerEventId::new(),
440 provider,
441 kind,
442 received_at,
443 occurred_at: raw
444 .occurred_at
445 .or_else(|| infer_occurred_at(&provider_payload)),
446 dedupe_key,
447 trace_id: TraceId::new(),
448 tenant_id: raw.tenant_id.clone(),
449 headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
450 batch: None,
451 raw_body: Some(raw.body.clone()),
452 provider_payload,
453 signature_status: SignatureStatus::Verified,
454 dedupe_claimed: false,
455 })
456 }
457
458 fn payload_schema(&self) -> ProviderPayloadSchema {
459 ProviderPayloadSchema::named("LinearEventPayload")
460 }
461
462 fn client(&self) -> Arc<dyn ConnectorClient> {
463 self.client.clone()
464 }
465}
466
467#[async_trait]
468impl ConnectorClient for LinearClient {
469 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
470 match method {
471 "list_issues" => {
472 let args: ListIssuesArgs = parse_args(args)?;
473 let config = self.resolve_client_config(&args.config)?;
474 let envelope = self
475 .request_graphql(
476 &config,
477 "query ListIssues($filter: IssueFilter, $first: Int, $after: String, $includeArchived: Boolean) { issues(filter: $filter, first: $first, after: $after, includeArchived: $includeArchived) { nodes { id identifier title priority estimate dueDate url createdAt updatedAt state { id name type } team { id key name } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } pageInfo { hasNextPage endCursor } } }",
478 json!({
479 "filter": args.filter,
480 "first": args.first,
481 "after": args.after,
482 "includeArchived": args.include_archived,
483 }),
484 Some("ListIssues"),
485 )
486 .await?;
487 extract_graphql_field(envelope, "issues")
488 }
489 "update_issue" => {
490 let args: UpdateIssueArgs = parse_args(args)?;
491 let config = self.resolve_client_config(&args.config)?;
492 let envelope = self
493 .request_graphql(
494 &config,
495 "mutation UpdateIssue($id: String!, $input: IssueUpdateInput!) { issueUpdate(id: $id, input: $input) { success issue { id identifier title priority estimate dueDate url updatedAt state { id name type } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } } }",
496 json!({
497 "id": args.id,
498 "input": args.changes,
499 }),
500 Some("UpdateIssue"),
501 )
502 .await?;
503 extract_graphql_field(envelope, "issueUpdate")
504 }
505 "create_comment" => {
506 let args: CreateCommentArgs = parse_args(args)?;
507 let config = self.resolve_client_config(&args.config)?;
508 let envelope = self
509 .request_graphql(
510 &config,
511 "mutation CreateComment($input: CommentCreateInput!) { commentCreate(input: $input) { success comment { id body url createdAt user { id name } issue { id identifier title } } } }",
512 json!({
513 "input": {
514 "issueId": args.issue_id,
515 "body": args.body,
516 }
517 }),
518 Some("CreateComment"),
519 )
520 .await?;
521 extract_graphql_field(envelope, "commentCreate")
522 }
523 "search" => {
524 let args: SearchArgs = parse_args(args)?;
525 let config = self.resolve_client_config(&args.config)?;
526 self.search_issues(&config, &args.query, args.first).await
527 }
528 "graphql" => {
529 let args: GraphqlArgs = parse_args(args)?;
530 let config = self.resolve_client_config(&args.config)?;
531 self.request_graphql(
532 &config,
533 &args.query,
534 args.variables.unwrap_or(JsonValue::Null),
535 args.operation_name.as_deref(),
536 )
537 .await
538 }
539 other => Err(ClientError::MethodNotFound(format!(
540 "linear connector does not implement outbound method `{other}`"
541 ))),
542 }
543 }
544}
545
546impl LinearClient {
547 fn resolve_client_config(
548 &self,
549 args: &LinearClientConfigArgs,
550 ) -> Result<ResolvedLinearClientConfig, ClientError> {
551 resolve_client_config_args(args).map_err(ClientError::InvalidArgs)
552 }
553
554 fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
555 self.state
556 .ctx
557 .read()
558 .expect("linear connector ctx poisoned")
559 .clone()
560 .ok_or_else(|| ClientError::Other("linear connector must be initialized".to_string()))
561 }
562
563 async fn request_graphql(
564 &self,
565 config: &ResolvedLinearClientConfig,
566 query: &str,
567 variables: JsonValue,
568 operation_name: Option<&str>,
569 ) -> Result<JsonValue, ClientError> {
570 let ctx = self.ctx()?;
571 ctx.rate_limiter
572 .scoped(&self.provider_id, "graphql")
573 .acquire()
574 .await;
575
576 let auth = self.auth_header(config).await?;
577 if let Some(error) =
578 crate::egress::client_error_for_url("connector_call:linear", &config.api_base_url)
579 {
580 return Err(error);
581 }
582 let mut request_body = JsonMap::new();
583 request_body.insert("query".to_string(), JsonValue::String(query.to_string()));
584 if !variables.is_null() {
585 request_body.insert("variables".to_string(), variables);
586 }
587 if let Some(operation_name) = operation_name {
588 request_body.insert(
589 "operationName".to_string(),
590 JsonValue::String(operation_name.to_string()),
591 );
592 }
593
594 let response = self
595 .http
596 .post(config.api_base_url.clone())
597 .header("Content-Type", "application/json")
598 .header("Authorization", auth)
599 .json(&JsonValue::Object(request_body))
600 .send()
601 .await
602 .map_err(|error| ClientError::Transport(error.to_string()))?;
603
604 let status = response.status();
605 let meta = graphql_meta(response.headers(), query);
606 let payload = response
607 .json::<JsonValue>()
608 .await
609 .map_err(|error| ClientError::Transport(error.to_string()))?;
610 let errors = payload.get("errors").cloned();
611 if is_rate_limited(status, errors.as_ref()) {
612 return Err(ClientError::RateLimited(graphql_error_message(
613 status,
614 errors.as_ref(),
615 )));
616 }
617 if !status.is_success() {
618 return Err(ClientError::Transport(graphql_error_message(
619 status,
620 errors.as_ref(),
621 )));
622 }
623
624 Ok(json!({
625 "data": payload.get("data").cloned().unwrap_or(JsonValue::Null),
626 "errors": errors.unwrap_or(JsonValue::Null),
627 "meta": meta,
628 }))
629 }
630
631 async fn auth_header(
632 &self,
633 config: &ResolvedLinearClientConfig,
634 ) -> Result<String, ClientError> {
635 match &config.auth {
636 LinearAuthSource::ApiKeyInline(value) => Ok(value.clone()),
637 LinearAuthSource::AccessTokenInline(value) => Ok(format!("Bearer {value}")),
638 LinearAuthSource::ApiKeySecret(secret_id) => {
639 let secret = self.secret_text(secret_id).await?;
640 Ok(secret)
641 }
642 LinearAuthSource::AccessTokenSecret(secret_id) => {
643 let secret = self.secret_text(secret_id).await?;
644 Ok(format!("Bearer {secret}"))
645 }
646 }
647 }
648
649 async fn secret_text(&self, secret_id: &SecretId) -> Result<String, ClientError> {
650 let ctx = self.ctx()?;
651 let secret = ctx
652 .secrets
653 .get(secret_id)
654 .await
655 .map_err(|error| ClientError::Other(error.to_string()))?;
656 Ok(secret.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string()))
657 }
658
659 async fn search_issues(
660 &self,
661 config: &ResolvedLinearClientConfig,
662 query: &str,
663 first: Option<i64>,
664 ) -> Result<JsonValue, ClientError> {
665 let searches = [
666 (
667 "query SearchIssues($query: String!, $first: Int) { searchIssues(query: $query, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
668 "SearchIssues",
669 ),
670 (
671 "query SearchIssues($term: String!, $first: Int) { searchIssues(term: $term, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
672 "SearchIssues",
673 ),
674 ];
675
676 for (index, (document, operation_name)) in searches.into_iter().enumerate() {
677 let variables = if document.contains("$query") {
678 json!({ "query": query, "first": first })
679 } else {
680 json!({ "term": query, "first": first })
681 };
682 match self
683 .request_graphql(config, document, variables, Some(operation_name))
684 .await
685 {
686 Ok(envelope) => return extract_graphql_field(envelope, "searchIssues"),
687 Err(ClientError::Transport(message))
688 if index == 0
689 && (message.contains("Unknown argument")
690 || message.contains("GRAPHQL_VALIDATION_FAILED")) => {}
691 Err(error) => return Err(error),
692 }
693 }
694
695 Err(ClientError::Transport(
696 "linear connector searchIssues query failed".to_string(),
697 ))
698 }
699
700 async fn probe_health(&self, health_url: &str) -> Result<bool, ClientError> {
701 if let Some(error) =
702 crate::egress::client_error_for_url("connector_call:linear", health_url)
703 {
704 return Err(error);
705 }
706 let response = self
707 .http
708 .get(health_url)
709 .send()
710 .await
711 .map_err(|error| ClientError::Transport(error.to_string()))?;
712 Ok(response.status() == StatusCode::OK)
713 }
714
715 async fn reenable_webhook(
716 &self,
717 config: &ResolvedLinearClientConfig,
718 webhook_id: &str,
719 ) -> Result<(), ClientError> {
720 let envelope = self
721 .request_graphql(
722 config,
723 "mutation ReenableWebhook($id: String!, $input: WebhookUpdateInput!) { webhookUpdate(id: $id, input: $input) { success webhook { id enabled } } }",
724 json!({
725 "id": webhook_id,
726 "input": { "enabled": true },
727 }),
728 Some("ReenableWebhook"),
729 )
730 .await?;
731 let payload = envelope
732 .get("data")
733 .and_then(|value| value.get("webhookUpdate"))
734 .ok_or_else(|| {
735 ClientError::Transport(
736 "linear connector response missing `webhookUpdate` GraphQL field".to_string(),
737 )
738 })?;
739 if payload
740 .get("success")
741 .and_then(JsonValue::as_bool)
742 .unwrap_or(false)
743 {
744 Ok(())
745 } else {
746 Err(ClientError::Transport(
747 "linear connector webhook re-enable returned success = false".to_string(),
748 ))
749 }
750 }
751}
752
753impl ActivatedLinearBinding {
754 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
755 let config: LinearBindingConfig =
756 serde_json::from_value(binding.config.clone()).map_err(|error| {
757 ConnectorError::Activation(format!(
758 "linear binding `{}` has invalid config: {error}",
759 binding.binding_id
760 ))
761 })?;
762 let signing_secret =
763 parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
764 ConnectorError::Activation(format!(
765 "linear binding `{}` requires secrets.signing_secret",
766 binding.binding_id
767 ))
768 })?;
769 let replay_grace_secs = config
770 .replay_grace_secs
771 .or(config.security.replay_grace_secs)
772 .or(config.security.timestamp_grace_secs)
773 .unwrap_or(DEFAULT_REPLAY_GRACE_SECS);
774 let monitor = config
775 .monitor
776 .map(|monitor| {
777 LinearWebhookMonitor::from_config(&binding.binding_id, monitor, &config.secrets)
778 })
779 .transpose()
780 .map_err(ConnectorError::Activation)?
781 .flatten();
782 Ok(Self {
783 binding_id: binding.binding_id.clone(),
784 path: config.match_config.path,
785 signing_secret,
786 replay_grace_secs,
787 monitor,
788 })
789 }
790}
791
792impl LinearWebhookMonitor {
793 fn from_config(
794 binding_id: &str,
795 config: LinearWebhookMonitorConfig,
796 binding_secrets: &LinearSecretsConfig,
797 ) -> Result<Option<Self>, String> {
798 if config.enabled == Some(false) {
799 return Ok(None);
800 }
801 let webhook_id = config
802 .webhook_id
803 .filter(|value| !value.trim().is_empty())
804 .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires webhook_id"))?;
805 let health_url = config
806 .health_url
807 .filter(|value| !value.trim().is_empty())
808 .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires health_url"))?;
809 let mut client = config.client;
810 if client.access_token.is_none()
811 && client.access_token_secret.is_none()
812 && client.api_key.is_none()
813 && client.api_key_secret.is_none()
814 && client.secrets.access_token.is_none()
815 && client.secrets.api_key.is_none()
816 {
817 client.secrets.access_token = binding_secrets.access_token.clone();
818 client.secrets.api_key = binding_secrets.api_key.clone();
819 }
820 let client_config = resolve_client_config_args(&client).map_err(|error| {
821 format!("linear binding `{binding_id}` monitor auth is invalid: {error}")
822 })?;
823 Ok(Some(Self {
824 webhook_id,
825 health_url,
826 probe_interval: config
827 .probe_interval_ms
828 .map(|ms| StdDuration::from_millis(ms.max(1)))
829 .unwrap_or_else(|| {
830 StdDuration::from_secs(
831 config
832 .probe_interval_secs
833 .unwrap_or(DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS)
834 .max(1),
835 )
836 }),
837 success_threshold: config
838 .success_threshold
839 .unwrap_or(DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD)
840 .max(1),
841 client_config,
842 }))
843 }
844}
845
846async fn run_webhook_monitor(
847 client: Arc<LinearClient>,
848 monitor: LinearWebhookMonitor,
849 mut shutdown: watch::Receiver<bool>,
850) {
851 let mut consecutive_successes = 0u32;
852 loop {
853 tokio::select! {
854 changed = shutdown.changed() => {
855 if changed.is_err() || *shutdown.borrow() {
856 break;
857 }
858 }
859 _ = tokio::time::sleep(monitor.probe_interval) => {}
860 }
861 if *shutdown.borrow() {
862 break;
863 }
864 match client.probe_health(&monitor.health_url).await {
865 Ok(true) => {
866 consecutive_successes = consecutive_successes.saturating_add(1);
867 if consecutive_successes >= monitor.success_threshold
868 && client
869 .reenable_webhook(&monitor.client_config, &monitor.webhook_id)
870 .await
871 .is_ok()
872 {
873 consecutive_successes = 0;
874 }
875 }
876 Ok(false) | Err(_) => {
877 consecutive_successes = 0;
878 }
879 }
880 }
881}
882
883fn resolve_client_config_args(
884 args: &LinearClientConfigArgs,
885) -> Result<ResolvedLinearClientConfig, String> {
886 let auth = if let Some(secret_id) = args
887 .access_token_secret
888 .as_deref()
889 .or(args.secrets.access_token.as_deref())
890 .and_then(|value| parse_secret_id(Some(value)))
891 {
892 LinearAuthSource::AccessTokenSecret(secret_id)
893 } else if let Some(secret_id) = args
894 .api_key_secret
895 .as_deref()
896 .or(args.secrets.api_key.as_deref())
897 .and_then(|value| parse_secret_id(Some(value)))
898 {
899 LinearAuthSource::ApiKeySecret(secret_id)
900 } else if let Some(token) = args.access_token.clone() {
901 LinearAuthSource::AccessTokenInline(token)
902 } else if let Some(api_key) = args.api_key.clone() {
903 LinearAuthSource::ApiKeyInline(api_key)
904 } else {
905 return Err(
906 "linear connector requires access_token, access_token_secret, api_key, or api_key_secret"
907 .to_string(),
908 );
909 };
910
911 Ok(ResolvedLinearClientConfig {
912 api_base_url: args
913 .api_base_url
914 .clone()
915 .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
916 auth,
917 })
918}
919
920fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
921 let mut effective = headers.clone();
922 for (raw, canonical) in [
923 ("content-type", "Content-Type"),
924 ("linear-signature", "Linear-Signature"),
925 ("linear-delivery", "Linear-Delivery"),
926 ("linear-event", "Linear-Event"),
927 ] {
928 if let Some(value) = header_value(headers, raw) {
929 effective
930 .entry(canonical.to_string())
931 .or_insert_with(|| value.to_string());
932 }
933 }
934 effective
935}
936
937fn linear_trigger_kind(payload: &JsonValue) -> String {
938 let event = payload
939 .get("type")
940 .and_then(JsonValue::as_str)
941 .map(|kind| {
942 let lower = kind.to_ascii_lowercase();
943 match lower.as_str() {
944 "issue" => "issue".to_string(),
945 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
946 "issuelabel" | "issue_label" => "issue_label".to_string(),
947 "project" | "projectupdate" | "project_update" => "project".to_string(),
948 "cycle" => "cycle".to_string(),
949 "customer" => "customer".to_string(),
950 "customerrequest" | "customer_request" => "customer_request".to_string(),
951 _ => lower,
952 }
953 })
954 .unwrap_or_else(|| "other".to_string());
955 let action = payload
956 .get("action")
957 .and_then(JsonValue::as_str)
958 .unwrap_or("update");
959 format!("{event}.{action}")
960}
961
962fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
963 let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Linear(payload)) =
964 payload
965 else {
966 return None;
967 };
968 let timestamp = match payload {
969 crate::triggers::event::LinearEventPayload::Issue(payload) => {
970 payload.common.webhook_timestamp
971 }
972 crate::triggers::event::LinearEventPayload::IssueComment(payload) => {
973 payload.common.webhook_timestamp
974 }
975 crate::triggers::event::LinearEventPayload::IssueLabel(payload) => {
976 payload.common.webhook_timestamp
977 }
978 crate::triggers::event::LinearEventPayload::Project(payload) => {
979 payload.common.webhook_timestamp
980 }
981 crate::triggers::event::LinearEventPayload::Cycle(payload) => {
982 payload.common.webhook_timestamp
983 }
984 crate::triggers::event::LinearEventPayload::Customer(payload) => {
985 payload.common.webhook_timestamp
986 }
987 crate::triggers::event::LinearEventPayload::CustomerRequest(payload) => {
988 payload.common.webhook_timestamp
989 }
990 crate::triggers::event::LinearEventPayload::Other(payload) => payload.webhook_timestamp,
991 }?;
992 OffsetDateTime::from_unix_timestamp_nanos(i128::from(timestamp) * 1_000_000).ok()
993}
994
995fn parse_args<T: for<'de> Deserialize<'de>>(args: JsonValue) -> Result<T, ClientError> {
996 serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
997}
998
999fn load_secret_text_blocking(
1000 ctx: &ConnectorCtx,
1001 secret_id: &SecretId,
1002) -> Result<String, ConnectorError> {
1003 let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
1004 secret.with_exposed(|bytes| {
1005 std::str::from_utf8(bytes)
1006 .map(|value| value.to_string())
1007 .map_err(|error| {
1008 ConnectorError::Secret(format!(
1009 "secret '{}' is not valid UTF-8: {error}",
1010 secret_id
1011 ))
1012 })
1013 })
1014}
1015
1016fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1017 let trimmed = raw?.trim();
1018 if trimmed.is_empty() {
1019 return None;
1020 }
1021 let (base, version) = match trimmed.rsplit_once('@') {
1022 Some((base, version_text)) => {
1023 let version = version_text.parse::<u64>().ok()?;
1024 (base, SecretVersion::Exact(version))
1025 }
1026 None => (trimmed, SecretVersion::Latest),
1027 };
1028 let (namespace, name) = base.split_once('/')?;
1029 if namespace.is_empty() || name.is_empty() {
1030 return None;
1031 }
1032 Some(SecretId::new(namespace, name).with_version(version))
1033}
1034
1035fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1036 headers
1037 .iter()
1038 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1039 .map(|(_, value)| value.as_str())
1040}
1041
1042fn fallback_body_digest(body: &[u8]) -> String {
1043 use sha2::Digest;
1044
1045 let digest = sha2::Sha256::digest(body);
1046 let mut encoded = String::with_capacity(digest.len() * 2);
1047 for byte in digest {
1048 encoded.push_str(&format!("{byte:02x}"));
1049 }
1050 format!("sha256:{encoded}")
1051}
1052
1053fn graphql_meta(headers: &reqwest::header::HeaderMap, query: &str) -> JsonValue {
1054 let estimated = estimate_query_complexity(query);
1055 let observed = header_i64(headers, "x-complexity");
1056 let complexity = observed.or(estimated);
1057 let complexity_warning = complexity.is_some_and(|value| value >= COMPLEXITY_WARNING_THRESHOLD);
1058 json!({
1059 "complexity_estimate": estimated,
1060 "observed_complexity": observed,
1061 "complexity_warning": complexity_warning,
1062 "rate_limit": {
1063 "requests_limit": header_i64(headers, "x-ratelimit-requests-limit"),
1064 "requests_remaining": header_i64(headers, "x-ratelimit-requests-remaining"),
1065 "requests_reset": header_i64(headers, "x-ratelimit-requests-reset"),
1066 "complexity_limit": header_i64(headers, "x-ratelimit-complexity-limit"),
1067 "complexity_remaining": header_i64(headers, "x-ratelimit-complexity-remaining"),
1068 "complexity_reset": header_i64(headers, "x-ratelimit-complexity-reset"),
1069 },
1070 })
1071}
1072
1073fn header_i64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<i64> {
1074 headers
1075 .get(name)
1076 .and_then(|value| value.to_str().ok())
1077 .and_then(|value| value.parse::<i64>().ok())
1078}
1079
1080fn estimate_query_complexity(query: &str) -> Option<i64> {
1081 let normalized = query.trim();
1082 if normalized.is_empty() {
1083 return None;
1084 }
1085 let mut score = 1i64;
1086 let field_count =
1087 normalized.matches('{').count() as i64 + normalized.matches('}').count() as i64;
1088 score += field_count.max(1);
1089 for window in ["first:", "last:"] {
1090 let mut rest = normalized;
1091 while let Some(index) = rest.find(window) {
1092 let after = &rest[index + window.len()..];
1093 let digits: String = after
1094 .chars()
1095 .skip_while(|ch| ch.is_whitespace())
1096 .take_while(|ch| ch.is_ascii_digit())
1097 .collect();
1098 if let Ok(value) = digits.parse::<i64>() {
1099 score += value;
1100 } else {
1101 score += 50;
1102 }
1103 rest = after;
1104 }
1105 }
1106 score += (normalized.matches("nodes").count() as i64) * 50;
1107 Some(score)
1108}
1109
1110fn extract_graphql_field(mut envelope: JsonValue, field: &str) -> Result<JsonValue, ClientError> {
1111 let meta = envelope.get("meta").cloned().unwrap_or(JsonValue::Null);
1112 let Some(data) = envelope.get_mut("data").and_then(JsonValue::as_object_mut) else {
1113 return Err(ClientError::Transport(
1114 "linear connector response missing GraphQL data".to_string(),
1115 ));
1116 };
1117 let mut extracted = data.remove(field).ok_or_else(|| {
1118 ClientError::Transport(format!(
1119 "linear connector response missing `{field}` GraphQL field"
1120 ))
1121 })?;
1122 if let Some(object) = extracted.as_object_mut() {
1123 object.insert("meta".to_string(), meta);
1124 }
1125 Ok(extracted)
1126}
1127
1128fn is_rate_limited(status: StatusCode, errors: Option<&JsonValue>) -> bool {
1129 if status == StatusCode::TOO_MANY_REQUESTS {
1130 return true;
1131 }
1132 if status == StatusCode::BAD_REQUEST {
1133 return graphql_error_codes(errors)
1134 .into_iter()
1135 .any(|code| code.eq_ignore_ascii_case("RATELIMITED"));
1136 }
1137 false
1138}
1139
1140fn graphql_error_codes(errors: Option<&JsonValue>) -> Vec<String> {
1141 errors
1142 .and_then(JsonValue::as_array)
1143 .map(|errors| {
1144 errors
1145 .iter()
1146 .filter_map(|error| {
1147 error
1148 .get("extensions")
1149 .and_then(JsonValue::as_object)
1150 .and_then(|extensions| extensions.get("code"))
1151 .and_then(JsonValue::as_str)
1152 .map(ToString::to_string)
1153 })
1154 .collect()
1155 })
1156 .unwrap_or_default()
1157}
1158
1159fn graphql_error_message(status: StatusCode, errors: Option<&JsonValue>) -> String {
1160 let messages = errors
1161 .and_then(JsonValue::as_array)
1162 .map(|errors| {
1163 errors
1164 .iter()
1165 .filter_map(|error| {
1166 error
1167 .get("message")
1168 .and_then(JsonValue::as_str)
1169 .map(ToString::to_string)
1170 })
1171 .collect::<Vec<_>>()
1172 })
1173 .unwrap_or_default();
1174 if messages.is_empty() {
1175 format!("linear GraphQL request failed with status {status}")
1176 } else {
1177 format!(
1178 "linear GraphQL request failed with status {status}: {}",
1179 messages.join("; ")
1180 )
1181 }
1182}