llmrs 0.1.0

Unofficial Rust SDK for IBM WatsonX AI platform
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
# Architecture

## Overview

llmrs is a **focused** Rust SDK for **calling IBM WatsonX APIs**:

- **watsonx.ai**: Text generation (streaming and non-streaming), list models, batch, chat completion
- **watsonx.orchestrate**: Agents, threads, send/stream messages

Data and Governance are optional (feature-gated); the default build is AI + Orchestrate only.

Principles:

- **API-first**: One-line connection, clear client methods that map to API calls
- **Async**: Tokio, SSE for streaming
- **Config**: Env-based; credentials in `.env` (see `src/env.rs`, `.env.example`)
- **Errors**: `thiserror` with actionable messages

## Architecture Diagram

```mermaid
graph TB
    A[User Application] --> B[WatsonxClient]
    A --> O[OrchestrateClient]
    A --> D[DataClient]
    A --> G[GovernanceClient]
    
    B --> C[WatsonxConfig]
    B --> DA[Authentication]
    B --> E[Generation API]
    B --> BA[Batch Generation]
    
    C --> F[Environment Variables]
    C --> CA[Configuration Options]
    
    DA --> H[IAM Token API]
    H --> I[Access Token]
    
    E --> J[Streaming Endpoint]
    E --> K[Generation Params]
    E --> N[Non-Streaming Endpoint]
    
    J --> L[SSE Parser]
    L --> M[Stream Callback]
    M --> A
    
    N --> NA[Complete Response]
    NA --> A
    
    BA --> BB[BatchRequest Queue]
    BB --> BC[tokio::spawn Tasks]
    BC --> BD[Concurrent HTTP Requests]
    BD --> BE[BatchGenerationResult]
    BE --> A
    
    B --> NQ[Quality Assessment]
    
    O --> P[OrchestrateConfig]
    O --> Q[Agent API]
    O --> R[Chat API]
    
    P --> F
    P --> S[Instance ID + Region]
    
    Q --> T[/agents endpoint]
    R --> U[/runs/stream endpoint]
    
    U --> V[Orchestrate SSE Parser]
    V --> W[Event Parser]
    W --> X[message.created/delta]
    X --> A
    
    D --> DC[DataConfig]
    D --> DD[Catalog API]
    D --> DE[Schema API]
    D --> DF[Table API]
    D --> DG[SQL Query API]
    
    DC --> F
    DD --> DH[/catalogs endpoint]
    DE --> DI[/schemas endpoint]
    DF --> DJ[/tables endpoint]
    DG --> DK[/sql endpoint]
    
    G --> GC[GovernanceConfig]
    G --> GD[Data Mart API]
    G --> GE[Subscription API]
    G --> GF[Bias Detection API]
    G --> GG[Monitoring API]
    
    GC --> F
    GD --> GH[/data_marts endpoint]
    GE --> GI[/subscriptions endpoint]
    GF --> GJ[/predictions endpoint]
    GG --> GK[/metrics endpoint]
    
    style B fill:#e1f5ff
    style O fill:#ffe1f5
    style D fill:#e8f5e9
    style G fill:#fff4e1
    style J fill:#fff4e1
    style L fill:#e8f5e9
    style V fill:#e8f5e9
    style BA fill:#fff9e1
    style BC fill:#ffe1f5
```

## Component Architecture

### Core Components

#### 1. `WatsonxClient` (src/client.rs)
The main client interface for interacting with WatsonX services.

**Responsibilities:**
- Manage authentication tokens
- Handle HTTP requests to WatsonX API
- Parse SSE streaming responses
- Provide quality assessment
- Execute concurrent batch operations with true parallelism

**Key Methods:**
- `new()` - Create client from configuration
- `from_env()` - Create client from environment variables
- `connect()` - Authenticate and get access token
- `generate()` - Generate text (uses streaming internally)
- `generate_with_config()` - Generate with custom configuration
- `generate_text()` - Standard text generation (returns complete response)
- `generate_text_stream()` - Real-time streaming generation
- `generate_batch()` - Concurrent batch generation with per-request configuration
- `generate_batch_simple()` - Concurrent batch generation with uniform configuration
- `list_models()` - Fetch available foundation models from API

#### 2. `WatsonxConfig` (src/config.rs)
Configuration management for WatsonX client.

**Responsibilities:**
- Load configuration from environment variables
- Validate configuration
- Provide defaults
- Support multiple configuration sources

**Configuration Sources:**
1. Environment variables (`.env` file or system env)
2. Programmatic configuration
3. Default values

**Environment Variables:** See `src/env.rs` and `.env.example`. Credential vars are sensitive; do not log or commit.
- `WATSONX_PROJECT_ID` - WatsonX project ID
- `WATSONX_API_URL` - API base URL (default: us-south)
- `IAM_IBM_CLOUD_URL` - IAM authentication URL
- `WATSONX_API_VERSION` - API version
- `WATSONX_TIMEOUT_SECS` - Request timeout

#### 3. `GenerationConfig` (src/types.rs)
Configuration for text generation requests.

**Responsibilities:**
- Define generation parameters
- Provide factory methods for common configurations
- Validate parameter ranges

**Key Parameters:**
- `model_id` - Model to use
- `max_tokens` - Maximum tokens to generate
- `timeout` - Request timeout
- `top_k`, `top_p` - Sampling parameters
- `stop_sequences` - Stop generation triggers
- `repetition_penalty` - Penalty for repetition

**Configuration Presets:**
- `default()` - Standard configuration
- `long_form()` - For long responses (128k tokens)
- `quick_response()` - Fast responses (2k tokens)

#### 4. `Error` Types (src/error.rs)
Comprehensive error handling.

**Error Variants:**
- `Network` - Network/connection errors
- `Authentication` - Auth failures
- `Api` - API errors from WatsonX
- `Timeout` - Request timeouts
- `Serialization` - JSON parsing errors
- `Configuration` - Config validation errors
- `InvalidInput` - Invalid user input
- `RateLimit` - Rate limiting
- `ModelNotFound` - Invalid model
- `ProjectNotFound` - Invalid project

#### 5. Model Constants (src/models.rs)
Constants for available WatsonX models.

**Supported Models:**
- `ibm/granite-4-h-small` (default)
- `ibm/granite-3-3-8b-instruct`
- `ibm/granite-3-0-8b-instruct`
- `ibm/granite-3-0-70b-instruct`
- `ibm/granite-2-0-8b-instruct`
- `ibm/granite-2-0-70b-instruct`
- `ibm/granite-1-3-8b-instruct`
- `ibm/granite-1-3-70b-instruct`

#### 6. Model Information (src/types.rs)
Dynamic model information fetched from WatsonX API.

**ModelInfo Structure:**
- `model_id` - Unique model identifier
- `name` - Human-readable model name
- `description` - Model description
- `provider` - Model provider (e.g., IBM)
- `version` - Model version
- `supported_tasks` - List of supported tasks
- `max_context_length` - Maximum context length
- `available` - Availability status

#### 6.1. Batch Generation Types (src/types.rs)
Types for concurrent batch text generation operations.

**BatchRequest:**
- `prompt` - Text prompt for generation
- `config` - Optional per-request configuration (uses default if None)
- `id` - Optional identifier for tracking requests

**BatchItemResult:**
- `id` - Request identifier (if provided)
- `prompt` - Original prompt used
- `result` - Generation result if successful
- `error` - Error if request failed
- Helper methods: `is_success()`, `is_failure()`

**BatchGenerationResult:**
- `results` - Vector of all batch item results
- `total` - Total number of requests
- `successful` - Number of successful requests
- `failed` - Number of failed requests
- `duration` - Total execution time
- Helper methods: `successes()`, `failures()`, `all_succeeded()`, `any_failed()`

**Batch Execution Flow:**
1. Create `BatchRequest` instances with prompts and optional configs
2. Call `generate_batch()` or `generate_batch_simple()`
3. Each request is spawned as a separate `tokio::spawn` task
4. All tasks execute concurrently using shared HTTP client
5. Results are collected and returned as `BatchGenerationResult`
6. Per-item error handling allows partial success

#### 7. `OrchestrateClient` (src/orchestrate/client.rs)
Client for WatsonX Orchestrate agent management and chat functionality.

**Responsibilities:**
- Manage agent discovery and selection
- Handle chat interactions with agents
- Maintain conversation context (thread_id)
- Parse Orchestrate-specific SSE events
- Manage tools, threads, runs, and documents

**Key Methods:**
- `new()` - Create client from OrchestrateConfig
- `list_agents()` - Discover available agents
- `send_message()` - Send message and get response (non-streaming)
- `stream_message()` - Send message with real-time streaming response
- `update_tool()`, `delete_tool()`, `test_tool()` - Tool management
- `get_tool_versions()`, `get_tool_execution_history()` - Tool tracking
- `chat_with_docs()`, `stream_chat_with_docs()` - Document Q&A

**Configuration:**
- Simplified config (matching wxo-client-main pattern)
- Only requires: `instance_id` and `region`
- Loads from environment: `WXO_INSTANCE_ID`, `WXO_REGION`, and credential (see `src/env.rs`).

#### 8. `OrchestrateConfig` (src/orchestrate/config.rs)
Simplified configuration for Watson Orchestrate operations.

**Responsibilities:**
- Load configuration from environment variables
- Construct base URL from instance_id and region
- Provide defaults (region defaults to "us-south")

**Environment Variables:** Credential: see `src/env.rs`. Others:
- `WXO_INSTANCE_ID` - Watson Orchestrate instance ID (required)
- `WXO_REGION` - Region (optional, defaults to "us-south")

**Base URL Construction:**
```
https://api.{region}.watson-orchestrate.cloud.ibm.com/instances/{instance_id}/v1/orchestrate
```

#### 9. `DataClient` (src/data/client.rs)
Client for WatsonX Data catalog, schema, and table management.

**Responsibilities:**
- Manage data catalogs (create, list, get, update, delete)
- Manage schemas (create, list, get, update, delete)
- Manage tables (create, list, get, delete)
- Manage columns (list, add)
- Execute SQL queries

**Key Methods:**
- `list_catalogs()` - List all catalogs for a database
- `create_catalog()` - Create a new catalog
- `list_schemas()` - List all schemas
- `create_schema()` - Create a new schema
- `list_tables()` - List all tables
- `create_table()` - Create a new table
- `execute_sql()` - Execute SQL queries

**Configuration:**
- Requires: `WATSONX_DATA_URL` (service URL)
- Optional: `WATSONX_DATA_API_VERSION` (defaults to "v3")

#### 10. `DataConfig` (src/data/config.rs)
Configuration for WatsonX Data operations.

**Environment Variables:**
- `WATSONX_DATA_URL` - Service instance URL (required)
- `WATSONX_DATA_API_VERSION` - API version (optional, defaults to "v3")

#### 11. `GovernanceClient` (src/governance/client.rs)
Client for WatsonX Governance model monitoring and compliance.

**Responsibilities:**
- Manage data marts (create, list, get, update, delete)
- Manage subscriptions (create, list)
- Compute bias detection and fairness monitoring
- Detect model drift
- Retrieve monitoring metrics

**Key Methods:**
- `list_data_marts()` - List all data marts
- `create_data_mart()` - Create a new data mart
- `create_subscription()` - Create a subscription for model monitoring
- `compute_bias()` - Compute bias mitigation for predictions
- `get_model_drift()` - Get model drift information
- `get_monitoring_metrics()` - Get monitoring metrics

**Configuration:**
- Requires: `WATSONX_GOV_SERVICE_INSTANCE_ID`
- Optional: `WATSONX_GOV_BASE_URL` (defaults to api.aiopenscale.cloud.ibm.com)
- Optional: `WATSONX_GOV_API_VERSION` (defaults to "2025-09-10")

#### 12. `GovernanceConfig` (src/governance/config.rs)
Configuration for WatsonX Governance operations.

**Environment Variables:**
- `WATSONX_GOV_SERVICE_INSTANCE_ID` - Service instance ID (required)
- `WATSONX_GOV_BASE_URL` - Base URL (optional)
- `WATSONX_GOV_API_VERSION` - API version (optional, defaults to "2025-09-10")

#### 13. Orchestrate Module Structure (src/orchestrate/)
Modular organization of Orchestrate functionality.

**Module Organization:**
- `mod.rs` - Module root and re-exports
- `config.rs` - Configuration management
- `client.rs` - Client implementation
- `types.rs` - All types and data structures

**Benefits:**
- Clear separation of concerns
- Easier to maintain and extend
- Logical namespace hierarchy
- Better code discoverability

## Data Flow

### Streaming Generation Flow

```mermaid
sequenceDiagram
    participant User
    participant Client
    participant IAM
    participant API
    participant SSE
    
    User->>Client: generate_text_stream()
    Client->>Client: Check access token
    alt No token
        Client->>IAM: Authenticate
        IAM-->>Client: Access token
    end
    Client->>API: POST /ml/v1/text/generation_stream
    API-->>Client: SSE stream
    loop Parse SSE chunks
        Client->>SSE: Read chunk
        SSE-->>Client: data: {JSON}
        Client->>Client: Parse JSON
        Client->>Client: Extract text
        Client->>User: Callback with chunk
    end
    Client-->>User: GenerationResult
```

### Batch Generation Flow

```mermaid
sequenceDiagram
    participant User
    participant Client
    participant Tokio
    participant API1
    participant API2
    participant API3
    
    User->>Client: generate_batch(requests, config)
    Client->>Client: Clone HTTP client & config
    loop For each request
        Client->>Tokio: tokio::spawn(async task)
        Tokio->>API1: Concurrent HTTP request 1
        Tokio->>API2: Concurrent HTTP request 2
        Tokio->>API3: Concurrent HTTP request 3
    end
    par Parallel execution
        API1-->>Tokio: Response 1
        API2-->>Tokio: Response 2
        API3-->>Tokio: Response 3
    end
    Tokio-->>Client: All tasks complete
    Client->>Client: Collect results
    Client->>Client: Create BatchGenerationResult
    Client-->>User: BatchGenerationResult
```

**Key Characteristics:**
- Each request spawns as separate `tokio::spawn` task
- True parallelism across multiple threads
- Shared HTTP client (reqwest::Client uses connection pooling)
- Per-item error handling (partial success supported)
- Results collected once all tasks complete

### Non-streaming Generation Flow

```mermaid
sequenceDiagram
    participant User
    participant Client
    participant IAM
    participant API
    
    User->>Client: generate_text()
    Client->>Client: Check access token
    alt No token
        Client->>IAM: Authenticate
        IAM-->>Client: Access token
    end
    Client->>API: POST /ml/v1/text/generation
    API-->>Client: Complete JSON response
    Client->>Client: Parse JSON
    Client->>Client: Extract generated text
    Client-->>User: GenerationResult
```

### Authentication Flow

```mermaid
sequenceDiagram
    participant Client
    participant IAM
    
    Client->>IAM: POST /identity/token
    Note over Client,IAM: grant_type=apikey
    Note over Client,IAM: apikey={key}
    IAM-->>Client: access_token
    Client->>Client: Store token
```

### Orchestrate Chat Flow (Non-streaming)

```mermaid
sequenceDiagram
    participant User
    participant Client
    participant API
    
    User->>Client: send_message(agent_id, message, thread_id?)
    Client->>API: POST /runs/stream with MessagePayload
    Note over Client,API: IAM-API_KEY header
    API-->>Client: SSE stream with events
    loop Parse SSE events
        Client->>Client: Parse message.created event
        Client->>Client: Extract response text and thread_id
    end
    Client-->>User: (response, new_thread_id)
```

### Orchestrate Chat Flow (Streaming)

```mermaid
sequenceDiagram
    participant User
    participant Client
    participant API
    
    User->>Client: stream_message(agent_id, message, thread_id?, callback)
    Client->>API: POST /runs/stream with MessagePayload
    Note over Client,API: IAM-API_KEY header
    API-->>Client: SSE stream with events
    loop Parse SSE events
        Client->>Client: Parse message.delta events
        Client->>Client: Extract incremental text chunks
        Client->>User: Callback with each chunk (real-time)
        Client->>Client: Track thread_id from events
    end
    Client-->>User: new_thread_id (for conversation continuity)
```

## SSE Parsing

The SDK implements proper Server-Sent Events parsing for streaming responses:

**Format:**
```
data: {"results":[{"generated_text":"chunk1"}]}

data: {"results":[{"generated_text":"chunk2"}]}

data: [DONE]
```

**Parsing Logic:**
1. Read stream chunks as bytes
2. Convert to UTF-8 strings
3. Buffer until complete lines
4. Parse `data:` prefixed lines
5. Extract JSON from data payload
6. Extract generated text from results
7. Call callback with each chunk
8. Accumulate total response

### Orchestrate SSE Event Parsing

The Orchestrate client parses Orchestrate-specific SSE events:

**Event Types:**
- `message.created` - Final complete message (for non-streaming)
- `message.delta` - Incremental text chunks (for streaming)

**Format:**
```
{"event":"message.created","data":{"message":{"content":[{"text":"Full response"}],"thread_id":"..."}}}
{"event":"message.delta","data":{"delta":{"content":[{"text":"chunk1"}]},"thread_id":"..."}}
{"event":"message.delta","data":{"delta":{"content":[{"text":"chunk2"}]},"thread_id":"..."}}
```

**Parsing Logic:**
1. Read SSE stream line by line
2. Parse JSON event data
3. For `message.created`: Extract full response text and thread_id
4. For `message.delta`: Extract incremental chunks and call callback
5. Maintain thread_id for conversation continuity

## Error Handling Strategy

### Error Propagation
- All async functions return `Result<T, Error>`
- Errors propagate up the call stack
- User can pattern match on error types

### Error Context
- Each error variant includes descriptive message
- Network errors include underlying cause
- API errors include HTTP status and response

### Error Recovery
- Authentication errors can be retried after reconnecting
- Network errors can be retried
- Timeout errors should use larger timeout
- Validation errors require user action

## Thread Safety

- Client is not thread-safe by design (async only)
- Configuration is thread-safe (immutable after creation)
- Error types are thread-safe
- Generation results are thread-safe

## Performance Considerations

### Streaming
- Uses async streams for real-time processing
- No buffering delays
- Callback fired immediately for each chunk

### Memory
- Streams processed incrementally
- Large responses don't require full memory allocation
- No unnecessary cloning

### Network
- Single connection for streaming
- Token cached in memory (not persisted)
- Connection timeout configurable

## Security

### Credentials
- Never hardcoded in examples
- Load from environment variables
- `.env` file excluded from version control

### API Communication
- HTTPS for all API calls
- Bearer token authentication
- Tokens not logged or exposed

### Dependencies
- All dependencies from crates.io
- Regular security updates
- No unsafe code blocks

## Testing Strategy

### Unit Tests
- Configuration validation
- Quality assessment logic
- Error handling

### Integration Tests
- Configuration from environment
- Client creation
- Model access
- Constant values

### Snapshot Tests
- Configuration snapshots
- Quality assessment results
- Model constants

## WatsonX AI SDK Enhancements (Latest)

### Batch Generation (Latest)
- ✅ Concurrent batch generation with `generate_batch()` and `generate_batch_simple()`
- ✅ True parallelism using `tokio::spawn` for each request
- ✅ Per-item error handling allowing partial success
- ✅ Flexible configuration (default or per-request)
- ✅ Result tracking with success/failure counts and duration
- ✅ Comprehensive batch types (`BatchRequest`, `BatchItemResult`, `BatchGenerationResult`)
- ✅ Color-coded example demonstrating parallel execution
- ✅ Extracted internal generation method for reusability
- ✅ Comprehensive unit tests

## Orchestrate SDK Enhancements

### Recent Improvements
- ✅ Flexible response parsing for API variations
- ✅ Graceful degradation for unavailable endpoints (404 handling)
- ✅ Optional Tool fields for compatibility with different API versions
- ✅ Comprehensive examples (basic, chat, advanced, use cases, chat with documents)
- ✅ Real-time streaming with proper SSE parsing
- ✅ Thread-based conversation context management
- ✅ Batch message processing support
- ✅ Tool execution and management (execute, update, delete, test)
- ✅ Tool versioning and execution history tracking
- ✅ Chat with documents (Q&A on uploaded documents)
- ✅ Modular code organization (config, client, types)
- ✅ Bearer token authentication with X-Instance-ID headers
- ✅ Multiple endpoint path fallbacks for robustness

### Robustness Features
- Multiple response format support (direct arrays, wrapped objects)
- Fallback parsing strategies for API variations
- Empty collection returns for unavailable endpoints (instead of errors)
- Consistent error handling across all endpoints
- Graceful degradation when optional features are unavailable
- Multiple endpoint path attempts for better compatibility
- Flexible document handling with fallback support

## Future Enhancements

### Potential Improvements
- Connection pooling for better performance
- Token caching with expiration
- Retry logic with exponential backoff
- Metrics and observability
- Enhanced document collection features (full CRUD operations)
- Session management abstraction (thread_id management)
- WebSocket support for Orchestrate (if available)
- Advanced streaming control options

### Architecture Considerations
- Keep client lightweight and simple
- Maintain async-first design
- Preserve type safety
- Keep error handling comprehensive
- Maintain streaming as primary interface for real-time features
- Follow established patterns (wxo-client-main, WatsonX API conventions)
- Keep configuration simple (environment-based, minimal fields)
- Prioritize graceful degradation over strict error handling