otlp2pipeline 0.4.0

OTLP ingestion worker for Cloudflare Pipelines and AWS
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
openapi: 3.0.3
info:
  title: otlp2pipeline API
  description: |
    OTLP ingestion worker that receives OpenTelemetry data and forwards it to
    Cloudflare Pipelines. Provides per-service RED metrics aggregation via
    Durable Objects with SQLite storage.
  version: 0.2.0
  license:
    name: MIT

servers:
  - url: https://{worker}.{account}.workers.dev
    description: Cloudflare Workers deployment
    variables:
      worker:
        default: otlp2pipeline
      account:
        default: your-account

security:
  - bearerAuth: []

paths:
  /health:
    get:
      summary: Health check
      operationId: healthCheck
      tags: [Health]
      security: []
      responses:
        '200':
          description: Service is healthy
          content:
            text/plain:
              schema:
                type: string
                example: ok

  /v1/logs:
    post:
      summary: Ingest OpenTelemetry logs
      operationId: ingestLogs
      tags: [Ingestion]
      description: |
        Accepts OTLP ExportLogsServiceRequest in protobuf or JSON format.
        Supports gzip compression. Logs are dual-written to Cloudflare Pipeline
        and aggregated in Durable Objects for RED metrics.
      requestBody:
        required: true
        content:
          application/x-protobuf:
            schema:
              type: string
              format: binary
              description: OTLP ExportLogsServiceRequest protobuf
          application/json:
            schema:
              $ref: '#/components/schemas/ExportLogsServiceRequest'
      responses:
        '200':
          description: Logs ingested successfully
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/HandleResponse'
        '400':
          description: Invalid request
          content:
            text/plain:
              schema:
                type: string

  /v1/traces:
    post:
      summary: Ingest OpenTelemetry traces
      operationId: ingestTraces
      tags: [Ingestion]
      description: |
        Accepts OTLP ExportTraceServiceRequest in protobuf or JSON format.
        Supports gzip compression. Traces are dual-written to Cloudflare Pipeline
        and aggregated in Durable Objects for RED metrics (including latency).
      requestBody:
        required: true
        content:
          application/x-protobuf:
            schema:
              type: string
              format: binary
              description: OTLP ExportTraceServiceRequest protobuf
          application/json:
            schema:
              $ref: '#/components/schemas/ExportTraceServiceRequest'
      responses:
        '200':
          description: Traces ingested successfully
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/HandleResponse'
        '400':
          description: Invalid request
          content:
            text/plain:
              schema:
                type: string

  /v1/metrics:
    post:
      summary: Ingest OpenTelemetry metrics
      operationId: ingestMetrics
      tags: [Ingestion]
      description: |
        Accepts OTLP ExportMetricsServiceRequest in protobuf or JSON format.
        Supports gauge and sum metric types. Supports gzip compression.
        Metrics are forwarded to Cloudflare Pipeline (no aggregation).
      requestBody:
        required: true
        content:
          application/x-protobuf:
            schema:
              type: string
              format: binary
              description: OTLP ExportMetricsServiceRequest protobuf
          application/json:
            schema:
              $ref: '#/components/schemas/ExportMetricsServiceRequest'
      responses:
        '200':
          description: Metrics ingested successfully
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/HandleResponse'
        '400':
          description: Invalid request
          content:
            text/plain:
              schema:
                type: string

  /v1/services:
    get:
      summary: List all registered services
      operationId: listServices
      tags: [Registry]
      description: |
        Returns all services that have sent telemetry data, along with their
        first-seen timestamp and which signal types have been received.
        Data is stored in a singleton Durable Object with SQLite backend.
        Maximum 10,000 services are tracked (cardinality protection).
      responses:
        '200':
          description: Array of registered services
          content:
            application/json:
              schema:
                type: array
                items:
                  $ref: '#/components/schemas/ServiceRecord'
        '500':
          description: Internal error querying Durable Object
          content:
            text/plain:
              schema:
                type: string

  /v1/metrics:
    get:
      summary: List all registered metrics
      operationId: listMetrics
      tags: [Registry]
      description: |
        Returns all metrics that have been ingested, along with their type.
        Data is stored in a singleton Durable Object with SQLite backend.
        Maximum 10,000 unique (name, type) pairs are tracked (cardinality protection).
      responses:
        '200':
          description: Array of registered metrics
          content:
            application/json:
              schema:
                type: array
                items:
                  $ref: '#/components/schemas/MetricRecord'
        '500':
          description: Internal error querying Durable Object
          content:
            text/plain:
              schema:
                type: string

  /v1/services/{service}/{signal}/stats:
    get:
      summary: Query aggregated RED metrics
      operationId: getServiceStats
      tags: [Stats]
      description: |
        Query per-minute aggregated statistics for a service and signal type.
        Data is stored in Durable Objects with SQLite backend.
        Default retention is 7 days.
      parameters:
        - name: service
          in: path
          required: true
          description: Service name
          schema:
            type: string
          example: my-service
        - name: signal
          in: path
          required: true
          description: Signal type (logs or traces)
          schema:
            type: string
            enum: [logs, traces]
        - name: from
          in: query
          required: false
          description: Start minute (Unix timestamp in minutes)
          schema:
            type: integer
            format: int64
          example: 28395360
        - name: to
          in: query
          required: false
          description: End minute (Unix timestamp in minutes)
          schema:
            type: integer
            format: int64
          example: 28395420
      responses:
        '200':
          description: Array of per-minute statistics
          content:
            application/json:
              schema:
                type: array
                items:
                  $ref: '#/components/schemas/StatsRow'
        '400':
          description: Invalid signal type
          content:
            text/plain:
              schema:
                type: string
        '500':
          description: Internal error querying Durable Object
          content:
            text/plain:
              schema:
                type: string

components:
  securitySchemes:
    bearerAuth:
      type: http
      scheme: bearer
      description: |
        Bearer token authentication. Set AUTH_TOKEN environment variable on the
        worker to enable. When enabled, all endpoints except /health require the
        Authorization header with format: Bearer <token>

  schemas:
    HandleResponse:
      type: object
      properties:
        status:
          type: string
          enum: [ok, error, partial]
          description: |
            Overall status of the request:
            - ok: All records processed successfully
            - partial: Some records failed
            - error: All records failed
        records:
          type: object
          additionalProperties:
            type: integer
          description: Count of records processed per table/signal type
          example:
            logs: 42
        errors:
          type: object
          additionalProperties:
            type: string
          description: Errors encountered per table (omitted if empty)
      required:
        - status
        - records

    ServiceRecord:
      type: object
      description: A registered service with signal availability flags
      properties:
        name:
          type: string
          description: Service name (from service.name resource attribute)
          example: my-service
        first_seen_at:
          type: integer
          format: int64
          description: Unix timestamp in milliseconds when service was first seen
          example: 1704067200000
        has_logs:
          type: integer
          enum: [0, 1]
          description: Whether logs have been received (0 = no, 1 = yes)
        has_traces:
          type: integer
          enum: [0, 1]
          description: Whether traces have been received (0 = no, 1 = yes)
        has_metrics:
          type: integer
          enum: [0, 1]
          description: Whether metrics have been received (0 = no, 1 = yes)
      required:
        - name
        - first_seen_at
        - has_logs
        - has_traces
        - has_metrics

    MetricRecord:
      type: object
      description: A registered metric with its type
      properties:
        name:
          type: string
          description: Metric name (from OTLP metric name field)
          example: http_request_duration_seconds
        metric_type:
          type: string
          description: Metric type (gauge or sum)
          example: gauge
      required:
        - name
        - metric_type

    StatsRow:
      type: object
      properties:
        minute:
          type: integer
          format: int64
          description: Unix timestamp in minutes
        count:
          type: integer
          format: int64
          description: Total record count for this minute
        error_count:
          type: integer
          format: int64
          description: |
            Error count for this minute.
            For logs: severity_number >= 17 (ERROR+).
            For traces: status_code == 2 (ERROR).
        latency_sum_us:
          type: integer
          format: int64
          nullable: true
          description: Sum of latencies in microseconds (traces only)
        latency_min_us:
          type: integer
          format: int64
          nullable: true
          description: Minimum latency in microseconds (traces only)
        latency_max_us:
          type: integer
          format: int64
          nullable: true
          description: Maximum latency in microseconds (traces only)
      required:
        - minute
        - count
        - error_count

    ExportLogsServiceRequest:
      type: object
      description: OTLP logs request (see opentelemetry-proto)
      externalDocs:
        url: https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/collector/logs/v1/logs_service.proto

    ExportTraceServiceRequest:
      type: object
      description: OTLP traces request (see opentelemetry-proto)
      externalDocs:
        url: https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/collector/trace/v1/trace_service.proto

    ExportMetricsServiceRequest:
      type: object
      description: OTLP metrics request (see opentelemetry-proto)
      externalDocs:
        url: https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/collector/metrics/v1/metrics_service.proto

tags:
  - name: Health
    description: Service health endpoints
  - name: Ingestion
    description: Telemetry data ingestion (OTLP)
  - name: Registry
    description: Service discovery and registration
  - name: Stats
    description: Aggregated RED metrics from Durable Objects