oxigdal-pubsub 0.1.3

Google Cloud Pub/Sub integration for OxiGDAL - Pure Rust streaming and messaging
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# OxiGDAL Pub/Sub

[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE)
[![Pure Rust](https://img.shields.io/badge/100%25-Pure%20Rust-orange.svg)](https://www.rust-lang.org/)

Google Cloud Pub/Sub integration for OxiGDAL - Pure Rust streaming and messaging for geospatial data processing.

## Overview

`oxigdal-pubsub` provides comprehensive support for Google Cloud Pub/Sub messaging, enabling real-time geospatial data streaming, event-driven processing, and distributed system communication.

## Features

### Core Capabilities

- **Publisher** (~1,500 LOC)
  - Async message publishing with batching
  - Ordering keys for sequential message delivery
  - Configurable retry logic with exponential backoff
  - Flow control and backpressure handling
  - Error handling and recovery

- **Subscriber** (~1,500 LOC)
  - Pull and push subscription models
  - Message acknowledgment and negative acknowledgment
  - Flow control with configurable limits
  - Dead letter queue support
  - Automatic acknowledgment deadline extension

- **Schema Support** (~500 LOC) - *feature-gated*
  - Apache Avro schema validation
  - Protocol Buffers schema support
  - Schema encoding and decoding
  - Schema registry management

- **Monitoring** (~300 LOC) - *feature-gated*
  - Latency tracking and metrics collection
  - Publisher and subscriber statistics
  - Custom metric points with labels
  - Metrics export for observability

- **Topic Management** (~600 LOC)
  - Topic creation and configuration
  - Message retention policies
  - Label management
  - Topic statistics and metadata

- **Subscription Management** (~700 LOC)
  - Subscription creation and updates
  - Expiration policies
  - Dead letter policies
  - Retry configurations
  - Subscription seeking (timestamp/snapshot)

## Pure Rust Implementation

This crate uses 100% Pure Rust implementations:

- `google-cloud-pubsub` - Pure Rust Pub/Sub client
- `google-cloud-auth` - Pure Rust authentication
- `apache-avro` - Pure Rust Avro support (optional)
- `prost` - Pure Rust Protocol Buffers (optional)

**No C/Fortran dependencies** - fully compliant with COOLJAPAN Pure Rust Policy.

## COOLJAPAN Policy Compliance

- **Pure Rust**: 100% Pure Rust, no C/Fortran dependencies
-**No unwrap()**: All error handling uses `Result<T, E>`
-**Files < 2000 lines**: All source files under 2000 lines
-**Workspace dependencies**: Uses workspace-level dependency management
-**Latest crates**: Uses latest available versions from crates.io

## Installation

Add to your `Cargo.toml`:

```toml
[dependencies]
oxigdal-pubsub = "0.1"
```

### Feature Flags

```toml
[dependencies.oxigdal-pubsub]
version = "0.1"
features = ["schema", "monitoring", "avro", "protobuf"]
```

Available features:

- `std` (default) - Standard library support
- `async` (default) - Async runtime support
- `publisher` (default) - Publisher functionality
- `subscriber` (default) - Subscriber functionality
- `schema` - Schema validation support
- `avro` - Apache Avro schema support
- `protobuf` - Protocol Buffers schema support
- `monitoring` - Metrics and monitoring
- `batching` - Message batching
- `ordering` - Message ordering
- `flow-control` - Flow control
- `dead-letter` - Dead letter queue support

## Quick Start

### Publishing Messages

```rust
use oxigdal_pubsub::{Publisher, PublisherConfig, Message};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create publisher configuration
    let config = PublisherConfig::new("my-project", "my-topic")
        .with_batching(true)
        .with_batch_size(100)
        .with_ordering(true);

    // Create publisher
    let publisher = Publisher::new(config).await?;

    // Publish a message
    let message = Message::new(b"Hello, Pub/Sub!")
        .with_attribute("source", "oxigdal")
        .with_attribute("timestamp", "2025-01-27")
        .with_ordering_key("geo-events-1");

    let message_id = publisher.publish(message).await?;
    println!("Published message: {}", message_id);

    // Flush any pending batches
    publisher.flush_all().await?;

    Ok(())
}
```

### Subscribing to Messages

```rust
use oxigdal_pubsub::{Subscriber, SubscriberConfig, HandlerResult};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create subscriber configuration
    let config = SubscriberConfig::new("my-project", "my-subscription")
        .with_ack_deadline(30)
        .with_handler_concurrency(10);

    // Create subscriber
    let subscriber = Subscriber::new(config).await?;

    // Start subscription with message handler
    let handle = subscriber.start(|message| {
        println!("Received: {:?}", message.data);
        println!("Attributes: {:?}", message.attributes);

        // Process the message...

        // Return acknowledgment result
        HandlerResult::Ack
    }).await?;

    // Keep running...
    tokio::signal::ctrl_c().await?;

    // Stop the subscriber
    subscriber.stop();

    Ok(())
}
```

### Topic Management

```rust
use oxigdal_pubsub::{TopicManager, TopicBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = TopicManager::new("my-project").await?;

    // Create a topic with configuration
    let topic = TopicBuilder::new("my-project", "geo-events")
        .message_retention(86400)  // 24 hours
        .label("env", "production")
        .label("type", "geospatial")
        .message_ordering(true)
        .create(&manager)
        .await?;

    println!("Created topic: {}", topic);

    // List all topics
    let topics = manager.list_topics();
    println!("Available topics: {:?}", topics);

    Ok(())
}
```

### Subscription Management

```rust
use oxigdal_pubsub::{
    SubscriptionManager, SubscriptionBuilder,
    DeadLetterPolicy, RetryPolicy, ExpirationPolicy,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = SubscriptionManager::new("my-project").await?;

    // Create a subscription with advanced configuration
    let subscription = SubscriptionBuilder::new(
        "my-project",
        "geo-subscription",
        "geo-events"
    )
    .ack_deadline(45)
    .message_retention(604800)  // 7 days
    .message_ordering(true)
    .dead_letter_policy(DeadLetterPolicy::new("dlq-topic", 5))
    .retry_policy(RetryPolicy::aggressive())
    .expiration_policy(ExpirationPolicy::never_expire())
    .filter("attributes.type=\"geospatial\"")
    .create(&manager)
    .await?;

    println!("Created subscription: {}", subscription);

    Ok(())
}
```

### Schema Validation

```rust
#[cfg(feature = "schema")]
use oxigdal_pubsub::{Schema, SchemaRegistry, SchemaValidator};
use oxigdal_pubsub::error::SchemaFormat;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut registry = SchemaRegistry::new();

    // Register an Avro schema
    let avro_schema = r#"
    {
        "type": "record",
        "name": "GeoEvent",
        "fields": [
            {"name": "id", "type": "string"},
            {"name": "latitude", "type": "double"},
            {"name": "longitude", "type": "double"},
            {"name": "timestamp", "type": "long"}
        ]
    }
    "#;

    let schema = Schema::new(
        "geo-event-schema",
        "GeoEvent",
        SchemaFormat::Avro,
        avro_schema,
    );

    registry.register(schema)?;

    // Validate messages against schema
    let validator = SchemaValidator::new(Arc::new(registry));
    // ... use validator to validate messages

    Ok(())
}
```

### Monitoring and Metrics

```rust
#[cfg(feature = "monitoring")]
use oxigdal_pubsub::{MetricsCollector, MetricsExporter};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let collector = MetricsCollector::new("my-project")
        .with_topic("my-topic")
        .with_subscription("my-subscription");

    // Record metrics
    collector.record_publish(1024, true);
    collector.record_receive(2048);

    // Start metrics exporter
    let exporter = MetricsExporter::new(
        Arc::new(collector),
        Duration::from_secs(60),
    );
    let handle = exporter.start().await?;

    // Export metrics manually
    let metrics = collector.export_metrics();
    println!("Exported {} metric points", metrics.len());

    Ok(())
}
```

## Architecture

### Module Structure

```
oxigdal-pubsub/
├── src/
│   ├── lib.rs              # Main library entry point
│   ├── error.rs            # Error types and handling
│   ├── publisher.rs        # Message publishing
│   ├── subscriber.rs       # Message subscription
│   ├── topic.rs            # Topic management
│   ├── subscription.rs     # Subscription management
│   ├── schema.rs           # Schema support (feature-gated)
│   └── monitoring.rs       # Metrics and monitoring (feature-gated)
└── tests/
    └── integration_test.rs # Integration tests
```

### Performance Characteristics

- **Batching**: Automatically batches messages for optimal throughput
- **Flow Control**: Prevents overwhelming subscribers with configurable limits
- **Retry Logic**: Exponential backoff with configurable attempts
- **Latency Tracking**: Sub-millisecond precision for performance monitoring
- **Memory Efficiency**: Zero-copy operations where possible

## Error Handling

All operations return `Result<T, PubSubError>` with comprehensive error types:

```rust
use oxigdal_pubsub::PubSubError;

match publisher.publish(message).await {
    Ok(message_id) => println!("Published: {}", message_id),
    Err(PubSubError::MessageTooLarge { size, max_size }) => {
        eprintln!("Message too large: {} > {}", size, max_size);
    }
    Err(PubSubError::Timeout { duration_ms }) => {
        eprintln!("Timeout after {}ms", duration_ms);
    }
    Err(e) if e.is_retryable() => {
        eprintln!("Retryable error: {}", e);
        // Retry logic...
    }
    Err(e) => eprintln!("Error: {}", e),
}
```

## Testing

Run tests:

```bash
# Run all tests
cargo test -p oxigdal-pubsub

# Run with all features
cargo test -p oxigdal-pubsub --all-features

# Run specific test
cargo test -p oxigdal-pubsub test_publisher_config
```

## Statistics

- **Total Lines of Code**: ~3,700 LOC
- **Publisher**: ~1,500 LOC
- **Subscriber**: ~1,500 LOC
- **Topic Management**: ~600 LOC
- **Subscription Management**: ~700 LOC
- **Schema Support**: ~500 LOC
- **Monitoring**: ~300 LOC
- **Error Handling**: ~400 LOC
- **Tests**: ~600 LOC

## License

Apache-2.0

## Authors

COOLJAPAN OU (Team Kitasan)

## Contributing

This crate is part of the OxiGDAL project. Contributions are welcome!

## See Also

- [OxiGDAL Core]../oxigdal-core - Core geospatial functionality
- [OxiGDAL Cloud]../oxigdal-cloud - Cloud storage integration
- [OxiGDAL Distributed]../oxigdal-distributed - Distributed computing
- [OxiGDAL Streaming]../oxigdal-streaming - Real-time data streaming