# Client Examples
Complete examples for integrating V-Queue into your applications.
## Available Client Implementations
The `v-queue-server/examples/` directory contains full client implementations:
- **Python** - `python_client.py`
- **Node.js** - `nodejs_client.js`
- **Rust** - `rust_client.rs`
- **Bash/cURL** - `curl_examples.sh`
See [v-queue-server/examples/README.md](../v-queue-server/examples/README.md) for details.
## Quick Start Examples
### Python Client
#### Simple Consumer
```python
import requests
from requests.auth import HTTPBasicAuth
import time
class VQueueConsumer:
def __init__(self, base_url, username=None, password=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password) if username else None
def consume(self, queue, consumer, timeout=30, max_messages=100):
"""Consume messages from queue."""
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/messages"
params = {"timeout_ms": timeout * 1000, "max_messages": max_messages}
response = requests.get(url, params=params, auth=self.auth)
response.raise_for_status()
return response.json()["messages"]
def commit(self, queue, consumer):
"""Commit consumer position."""
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/commit"
response = requests.post(url, auth=self.auth)
response.raise_for_status()
# Usage
consumer = VQueueConsumer("http://localhost:9093", "admin", "password")
while True:
messages = consumer.consume("events", "my-service", timeout=30)
for msg in messages:
print(f"Processing: {msg['value']}")
# Your processing logic here
if messages:
consumer.commit("events", "my-service")
print(f"Committed {len(messages)} messages")
time.sleep(1)
```
#### With Error Handling
```python
import requests
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VQueueConsumer:
def __init__(self, base_url, username=None, password=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password) if username else None
def consume(self, queue, consumer, timeout=30, max_messages=100):
"""Consume messages with error handling."""
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/messages"
params = {"timeout_ms": timeout * 1000, "max_messages": max_messages}
try:
# Add extra timeout to request (server timeout + buffer)
response = requests.get(
url,
params=params,
auth=self.auth,
timeout=timeout + 5
)
response.raise_for_status()
return response.json()["messages"]
except requests.Timeout:
logger.warning("Request timed out")
return []
except requests.HTTPError as e:
if e.response.status_code == 404:
logger.info("Queue or consumer created")
return []
logger.error(f"HTTP error: {e}")
raise
except RequestException as e:
logger.error(f"Request failed: {e}")
raise
def commit(self, queue, consumer):
"""Commit with error handling."""
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/commit"
try:
response = requests.post(url, auth=self.auth)
response.raise_for_status()
logger.info("Position committed")
except RequestException as e:
logger.error(f"Commit failed: {e}")
raise
def main():
consumer = VQueueConsumer("http://localhost:9093", "admin", "password")
queue = "events"
consumer_name = "my-service"
while True:
try:
messages = consumer.consume(queue, consumer_name, timeout=30)
for msg in messages:
try:
# Process message
process_message(msg)
except Exception as e:
logger.error(f"Failed to process message {msg['offset']}: {e}")
# Decide: continue or stop processing batch
if messages:
consumer.commit(queue, consumer_name)
logger.info(f"Processed {len(messages)} messages")
except KeyboardInterrupt:
logger.info("Shutting down...")
break
except Exception as e:
logger.error(f"Error in main loop: {e}")
time.sleep(5) # Back off on error
def process_message(msg):
"""Process a single message."""
print(f"Offset: {msg['offset']}, Type: {msg['msg_type']}, Value: {msg['value']}")
if __name__ == "__main__":
main()
```
### Node.js Client
#### Simple Consumer
```javascript
const axios = require('axios');
class VQueueConsumer {
constructor(baseUrl, username, password) {
this.client = axios.create({
baseURL: baseUrl,
auth: username ? { username, password } : undefined
});
}
async consume(queue, consumer, timeout = 30, maxMessages = 100) {
const response = await this.client.get(
`/api/v1/queues/${queue}/consumers/${consumer}/messages`,
{
params: { timeout_ms: timeout * 1000, max_messages: maxMessages }
}
);
return response.data.messages;
}
async commit(queue, consumer) {
await this.client.post(
`/api/v1/queues/${queue}/consumers/${consumer}/commit`
);
}
}
// Usage
async function main() {
const consumer = new VQueueConsumer(
'http://localhost:9093',
'admin',
'password'
);
while (true) {
const messages = await consumer.consume('events', 'my-service', 30);
for (const msg of messages) {
console.log(`Processing: ${msg.value}`);
// Your processing logic
}
if (messages.length > 0) {
await consumer.commit('events', 'my-service');
console.log(`Committed ${messages.length} messages`);
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
main().catch(console.error);
```
#### With Error Handling
```javascript
const axios = require('axios');
class VQueueConsumer {
constructor(baseUrl, username, password) {
this.client = axios.create({
baseURL: baseUrl,
auth: username ? { username, password } : undefined,
timeout: 35000 // 35 seconds
});
}
async consume(queue, consumer, timeout = 30, maxMessages = 100) {
try {
const response = await this.client.get(
`/api/v1/queues/${queue}/consumers/${consumer}/messages`,
{
params: { timeout_ms: timeout * 1000, max_messages: maxMessages },
timeout: timeout * 1000 + 5000 // Add buffer
}
);
return response.data.messages;
} catch (error) {
if (error.code === 'ECONNABORTED') {
console.log('Request timed out');
return [];
}
if (error.response?.status === 404) {
console.log('Queue or consumer created');
return [];
}
throw error;
}
}
async commit(queue, consumer) {
try {
await this.client.post(
`/api/v1/queues/${queue}/consumers/${consumer}/commit`
);
console.log('Position committed');
} catch (error) {
console.error('Commit failed:', error.message);
throw error;
}
}
}
async function processMessage(msg) {
console.log(`Offset: ${msg.offset}, Type: ${msg.msg_type}, Value: ${msg.value}`);
// Your processing logic
}
async function main() {
const consumer = new VQueueConsumer(
'http://localhost:9093',
'admin',
'password'
);
const queue = 'events';
const consumerName = 'my-service';
while (true) {
try {
const messages = await consumer.consume(queue, consumerName, 30);
for (const msg of messages) {
try {
await processMessage(msg);
} catch (error) {
console.error(`Failed to process message ${msg.offset}:`, error);
}
}
if (messages.length > 0) {
await consumer.commit(queue, consumerName);
console.log(`Processed ${messages.length} messages`);
}
} catch (error) {
console.error('Error in main loop:', error.message);
await new Promise(resolve => setTimeout(resolve, 5000)); // Back off
}
}
}
// Graceful shutdown
process.on('SIGINT', () => {
console.log('Shutting down...');
process.exit(0);
});
main().catch(console.error);
```
### Rust Client
#### Simple Consumer
```rust
use reqwest::blocking::Client;
use serde::{Deserialize, Serialize};
use std::thread;
use std::time::Duration;
#[derive(Debug, Deserialize)]
struct MessagesResponse {
messages: Vec<Message>,
}
#[derive(Debug, Deserialize)]
struct Message {
offset: u64,
msg_type: String,
value: serde_json::Value,
}
struct VQueueConsumer {
client: Client,
base_url: String,
username: Option<String>,
password: Option<String>,
}
impl VQueueConsumer {
fn new(base_url: &str, username: Option<String>, password: Option<String>) -> Self {
Self {
client: Client::new(),
base_url: base_url.to_string(),
username,
password,
}
}
fn consume(
&self,
queue: &str,
consumer: &str,
timeout: u64,
max_messages: u32,
) -> Result<Vec<Message>, Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/queues/{}/consumers/{}/messages",
self.base_url, queue, consumer
);
let mut request = self.client.get(&url)
.query(&[("timeout_ms", (timeout * 1000).to_string())])
.query(&[("max_messages", max_messages.to_string())]);
if let (Some(user), Some(pass)) = (&self.username, &self.password) {
request = request.basic_auth(user, Some(pass));
}
let response = request.send()?.json::<MessagesResponse>()?;
Ok(response.messages)
}
fn commit(&self, queue: &str, consumer: &str) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"{}/api/v1/queues/{}/consumers/{}/commit",
self.base_url, queue, consumer
);
let mut request = self.client.post(&url);
if let (Some(user), Some(pass)) = (&self.username, &self.password) {
request = request.basic_auth(user, Some(pass));
}
request.send()?;
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let consumer = VQueueConsumer::new(
"http://localhost:9093",
Some("admin".to_string()),
Some("password".to_string()),
);
loop {
let messages = consumer.consume("events", "my-service", 30, 100)?;
for msg in &messages {
println!("Processing: {:?}", msg.value);
// Your processing logic
}
if !messages.is_empty() {
consumer.commit("events", "my-service")?;
println!("Committed {} messages", messages.len());
}
thread::sleep(Duration::from_secs(1));
}
}
```
### Go Client
```go
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type Message struct {
Offset int64 `json:"offset"`
MsgType string `json:"msg_type"`
Value interface{} `json:"value"`
}
type MessagesResponse struct {
Messages []Message `json:"messages"`
}
type VQueueConsumer struct {
baseURL string
username string
password string
client *http.Client
}
func NewVQueueConsumer(baseURL, username, password string) *VQueueConsumer {
return &VQueueConsumer{
baseURL: baseURL,
username: username,
password: password,
client: &http.Client{Timeout: 35 * time.Second},
}
}
func (c *VQueueConsumer) Consume(queue, consumer string, timeout, maxMessages int) ([]Message, error) {
url := fmt.Sprintf("%s/api/v1/queues/%s/consumers/%s/messages?timeout_ms=%d&max_messages=%d",
c.baseURL, queue, consumer, timeout*1000, maxMessages)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, body)
}
var response MessagesResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
}
return response.Messages, nil
}
func (c *VQueueConsumer) Commit(queue, consumer string) error {
url := fmt.Sprintf("%s/api/v1/queues/%s/consumers/%s/commit",
c.baseURL, queue, consumer)
req, err := http.NewRequest("POST", url, nil)
if err != nil {
return err
}
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, body)
}
return nil
}
func main() {
consumer := NewVQueueConsumer("http://localhost:9093", "admin", "password")
for {
messages, err := consumer.Consume("events", "my-service", 30, 100)
if err != nil {
fmt.Printf("Error consuming: %v\n", err)
time.Sleep(5 * time.Second)
continue
}
for _, msg := range messages {
fmt.Printf("Processing: %v\n", msg.Value)
// Your processing logic
}
if len(messages) > 0 {
if err := consumer.Commit("events", "my-service"); err != nil {
fmt.Printf("Error committing: %v\n", err)
} else {
fmt.Printf("Committed %d messages\n", len(messages))
}
}
time.Sleep(1 * time.Second)
}
}
```
## Advanced Patterns
### Batch Processing with Checkpointing
```python
import requests
from requests.auth import HTTPBasicAuth
import time
class VQueueConsumer:
def __init__(self, base_url, username=None, password=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password) if username else None
def consume(self, queue, consumer, timeout=30, max_messages=100):
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/messages"
params = {"timeout_ms": timeout * 1000, "max_messages": max_messages}
response = requests.get(url, params=params, auth=self.auth, timeout=timeout+5)
response.raise_for_status()
return response.json()["messages"]
def commit(self, queue, consumer):
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/commit"
response = requests.post(url, auth=self.auth)
response.raise_for_status()
def process_batch(messages, batch_size=10):
"""Process messages in smaller batches with checkpointing."""
consumer = VQueueConsumer("http://localhost:9093", "admin", "password")
for i in range(0, len(messages), batch_size):
batch = messages[i:i+batch_size]
# Process batch
for msg in batch:
process_message(msg)
# Checkpoint: commit after each small batch
consumer.commit("events", "my-service")
print(f"Checkpointed at offset {batch[-1]['offset']}")
def process_message(msg):
print(f"Processing message {msg['offset']}: {msg['value']}")
# Your logic here
```
### Multi-Queue Consumer
```python
import requests
from requests.auth import HTTPBasicAuth
from concurrent.futures import ThreadPoolExecutor
import time
class VQueueConsumer:
def __init__(self, base_url, username=None, password=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password) if username else None
def consume(self, queue, consumer, timeout=30, max_messages=100):
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/messages"
params = {"timeout_ms": timeout * 1000, "max_messages": max_messages}
response = requests.get(url, params=params, auth=self.auth, timeout=timeout+5)
response.raise_for_status()
return response.json()["messages"]
def commit(self, queue, consumer):
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/commit"
response = requests.post(url, auth=self.auth)
response.raise_for_status()
def consume_queue(queue_name, consumer_name):
"""Consume from a single queue."""
consumer = VQueueConsumer("http://localhost:9093", "admin", "password")
while True:
try:
messages = consumer.consume(queue_name, consumer_name, timeout=30)
for msg in messages:
print(f"[{queue_name}] Processing: {msg['value']}")
if messages:
consumer.commit(queue_name, consumer_name)
print(f"[{queue_name}] Committed {len(messages)} messages")
except Exception as e:
print(f"[{queue_name}] Error: {e}")
time.sleep(5)
def main():
"""Consume from multiple queues concurrently."""
queues = [
("events", "events-consumer"),
("logs", "logs-consumer"),
("metrics", "metrics-consumer"),
]
with ThreadPoolExecutor(max_workers=len(queues)) as executor:
for queue, consumer in queues:
executor.submit(consume_queue, queue, consumer)
if __name__ == "__main__":
main()
```
### Error Recovery and Retry
```python
import requests
from requests.auth import HTTPBasicAuth
import time
from functools import wraps
def retry(max_attempts=3, delay=1, backoff=2):
"""Retry decorator with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
current_delay = delay
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts >= max_attempts:
raise
print(f"Attempt {attempts} failed: {e}. Retrying in {current_delay}s...")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
class VQueueConsumer:
def __init__(self, base_url, username=None, password=None):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password) if username else None
@retry(max_attempts=3, delay=1, backoff=2)
def consume(self, queue, consumer, timeout=30, max_messages=100):
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/messages"
params = {"timeout_ms": timeout * 1000, "max_messages": max_messages}
response = requests.get(url, params=params, auth=self.auth, timeout=timeout+5)
response.raise_for_status()
return response.json()["messages"]
@retry(max_attempts=3, delay=1, backoff=2)
def commit(self, queue, consumer):
url = f"{self.base_url}/api/v1/queues/{queue}/consumers/{consumer}/commit"
response = requests.post(url, auth=self.auth)
response.raise_for_status()
# Usage
consumer = VQueueConsumer("http://localhost:9093", "admin", "password")
messages = consumer.consume("events", "my-service") # Auto-retries on failure
```
## Testing
### Mock Server for Testing
```python
# test_consumer.py
import unittest
from unittest.mock import Mock, patch
import requests
class TestVQueueConsumer(unittest.TestCase):
@patch('requests.get')
def test_consume_success(self, mock_get):
# Mock successful response
mock_response = Mock()
mock_response.json.return_value = {
"messages": [
{"offset": 0, "msg_type": "string", "value": "test"}
]
}
mock_response.raise_for_status = Mock()
mock_get.return_value = mock_response
consumer = VQueueConsumer("http://localhost:9093")
messages = consumer.consume("test-queue", "test-consumer")
self.assertEqual(len(messages), 1)
self.assertEqual(messages[0]["value"], "test")
@patch('requests.get')
def test_consume_timeout(self, mock_get):
# Mock timeout
mock_get.side_effect = requests.Timeout()
consumer = VQueueConsumer("http://localhost:9093")
messages = consumer.consume("test-queue", "test-consumer")
self.assertEqual(messages, [])
if __name__ == "__main__":
unittest.main()
```
## Next Steps
- [API Reference](05-api-reference.md)
- [Performance Tuning](08-performance.md)
- [Full Example Implementations](../v-queue-server/examples/README.md)